Re-Architecting the Document Integrator

In a perfect world, all the data pertaining to a certain domain would
be authoritatively held and edited in a single place, and everything
else would refer to that place as the source of truth.

Of course, we don’t live in a perfect world, and so we need to live
with compromises: for example, all the editorial content relative to
our products is written in our main offices, using one application;
but product measurements and stock levels are handled in our
distribution centres, using a different application. Indeed, each
distribution centre runs its own local instance of that application —
not being able to ship orders because of lost connectivity to the main office would
not please our customers! Despite this, many clients (not least, the
customer-facing websites) still need to access all the information
about a product, and need to do it fast.

Having each client talk to the various authoritative applications is
unfeasible for many reasons: it requires uninterrupted connectivity
across the globe; it would place unreasonable load on our internal
servers; it would require each client to integrate the various pieces
of information by themselves.

Instead, we created what is essentially a fast cache of integrated information:
it receives notifications of each change that happens in the authoritative applications, updates
its internal state, and publishes documents describing each
product in formats that are convenient for clients to search and
use. We have a few of these systems, and their common structure is
implemented by a library we call the Document Integrator.

The Document Integrator structures an application as a series of
components:

  • the Consume components handle incoming messages, and create or
    update documents based on them
  • the Munge components notice when pieces of information are missing
    from a document, and either retrieve it from the appropriate
    authoritative application, or derive it from information already
    present in the document
  • the Publish components take a document and publish it to some
    other service: for example, SOLR for searching, or ActiveMQ to update
    other systems

The way we initially wrote the Integrator looked like this:

streaming-diagram-1

All components were run inside the same process, and each message
passed each affected document through each component. We could,
of course, run many such processes in parallel, but there were a few
issues:

  • a series of messages affecting the same document, arriving in quick
    succession, generated unnecessary work; we could have just waited
    for the last message to arrive and publish the final version,
    instead of publishing all the intermediate ones that were visible
    for maybe a second each
  • if one publishing destination had scalability problems, we were
    forced to reduce the number of parallel processes, which reduced our
    ability to consume messages
  • if a munger was particularly slow, we could not run more of it
    without choking the un-scalable publishing destination

Essentially, we had tied together pieces that should have been scaled
independently. So, we rewrote it to work like this:

state-machine-diagram-1

Now, each kind of component is executed in a separate set of
processes: we can have 50 consumers, 100 mungers, and 20
publishers. And trains of messages touching the same document over and
over again usually result in very few runs of that document through
mungers and publishers. How did we do it?

In the “streaming” version, each consumer component returned the IDs
of the documents it had created or updated, and we fed each of those
documents through the other components. But in the new,
separate-processes version, we can’t do that; not so much because
inter-process communication is hard, but more because if we sent
each ID to the mungers and publishers, we’d still have the same
problem of needlessly processing the same document multiple times.

Instead, each document has a state — when a document is created or
modified by a consumer, it’s put in the state “to be munged”. Each munger
process asks the document store for a random document in that state,
grabs it, runs it through each Munger component, then releases it
and puts it in the state “to be published”. Each publisher process does
the equivalent thing, ending with the document in the state
“published”.

“Grabbing” a document consists of updating it with the
identity of the process that’s working on it (a UUID, so processes can
be easily distributed across different machines, as long as they can
all talk to the same document store) and a timeout (so that a crash
won’t leave a document grabbed forever). “Releasing” it just resets
those two values. In addition, each munger and publisher process
checks that its “grab” is still valid every time it tries to update
the document: consumer processes can “steal” a grab when they update a
document. In this way, a document updated by a train of messages will
get to the publishers only at the end of the train, saving lots of
processing.

Finally, we also changed our concurrency control. In the initial
version, every document to be updated was locked, and it was released
after all the consuming and munging (publishing is read-only as far as
the document store is concerned, so no locking was needed there). This
caused complicated contention scenarios that meant that most of the
time, our processes were waiting for each other and not actually doing
much.

Now, instead, we employ optimistic concurrency control — each
process tries to update the document it’s working on, but that update
may fail if the document has been modified since that process read
it. If the update fails, the process re-reads the document and tries
again. Although this causes some processing to happen more than once,
in practice the whole throughput of the system is much better, and
under load all our processes are always running.

Print Friendly
This entry was posted in Architecture, Performance by dakkar. Bookmark the permalink.

About dakkar

Gianni is a Perl Architect at NAP. His code from previous lives runs in universities administration software, inside ask.com news system, and even in Antarctica. He's currently busy writing libraries to make "the right thing" be "the easy thing".

Leave a Reply