NAP::Messaging – glue and policies for interacting with ActiveMQ

As part of our service oriented architecture, our Perl applications send messages over ActiveMQ; a few of them also consume those messages and act on them.

All these consumer applications are based on Catalyst. Initially, we used Catalyst::Engine::Stomp and a rather complicated set of in-house libraries to wrap it.

Those in-house libraries have, in time, grown to incorporate more and more responsibilities, including testing, logging, message serialisation, plugin loading… it was time to break them apart and write something cleaner.

Most of the results of this big rewrite have been published on the CPAN and GitHub, but we needed a set of “officially blessed” modules defining the policies we want to follow when dealing with messages, all in a single place, to have all our applications behave the same way.

This set of modules is called NAP::Messaging, and we have released it to the world. In this article I’m going to show the features provided, and some of the interesting parts of the implementation. I’ll also explain how to build your own policies on top of the various CPAN distributions we’ve released.

The features

NAP::Messaging provides:

Most of these features are specific to the way we do things at NET-A-PORTER, which is why they’re probably going to remain inside this distribution. On the other hand, if we get requests to push some of these pieces to the CPAN, we will do our best to do so.

The application base class

NAP::Messaging::Catalyst is a Catalyst subclass; you’re supposed to derive your consumer application from it:

package MyApp;
use NAP::Policy 'class';
extends 'NAP::Messaging::Catalyst';
__PACKAGE__->setup();

(see the article on NAP::policy if you want to know what it does)

NAP::Messaging::Catalyst sets up a few default Catalyst plugins, including CatalystX::ComponentsFromConfig::ModelPlugin, configures a Log::Log4perl::Catalyst instance, and provides a convenience method to get the list of destinations the application is going to consume from:

sub jms_destinations {
    my ($class) = @_;

    my @namespaces = map { $class->controller($_)->action_namespace }
        $class->controllers;
    my @destinations = map { '/'.$_ }
        grep { m{^(queue|topic)}x } @namespaces;
    return @destinations;
}

Nothing particularly complicated, but it saves having to write the same lines in every script used to start the application, since Plack::Handler::Stomp requires the destinations to subscribe to before it will run the application.

Logging

Logging is hard. We have decided to use Log::Log4perl everywhere, and the application base class enforces this. The logging object is instantiated with information extracted from the application configuration file:

log4perl log4perl.conf
<log4perlopts>
 autoflush 1
</log4perlopts>

This will call something like:

Log::Log4perl::Catalyst->new(
  'log4perl.conf',
  {
    autoflush => 1,
  },
);

The consumer base class

CatalystX::ConsumesJMS provides a different way to write controllers deriving from
Catalyst::Controller::JMS, but it does not provide a complete solution: it provides mechanisms, but not policies. NAP::Messaging::Base::Consumer defines our policies.

First of all, it validates every incoming message against a Data::Rx schema defined in the route’s spec slot (see CatalystX::ConsumesJMS‘s docs for details on routes). It also extracts JMS-specific headers from the PSGI environment, putting them into the stash:

my $psgi_env = $ctx->req->env;
my %headers = map { s/^jms\.//r, $psgi_env->{$_} }
    grep { /^jms\./ } keys %$psgi_env;
$ctx->stash->{headers}=\%headers;

Finally, the code sub is called inside a try-block.

Messages that don’t validate, or messages that cause exceptions during processing, are sent to a “dead letter queue”, wrapped together with descriptions of the request and the exception. So, for example, a message of type foo that failed to validate when sent to /queue/bar would produce a message of type error-foo in /queue/DLQ.failed-validation.queue/bar with a body like:

{
    "original_message": { "the": "original", "message": "body" },
    "original_headers": { "type": "foo", "etc": "headers" },
    "consumer": "MyApp::Consumer::_queue_foo",
    "destination": "/queue/foo",
    "errors": "validation failure etc etc",
}

Similar messages would be sent to /queue/DLQ.queue/bar if processing threw exceptions.

The model adaptor

We use Catalyst::ComponentsFromConfig to generate some of our models, to avoid having to write empty sub-classes. In the case of our message queue model, though, we want to provide some default configuration that can’t be specified in the config files, so we use a specific adaptor. The configuration includes:

  • some default headers for all messages sent (content-type:json and persistent:true)
  • a JSON serialiser that produces UTF-8-encoded byte strings (courtesy of JSON::XS)
  • a pointer to the global application configuration, used by our producer/transformer role to remap destinations
  • the global application logger, to log everything together

We instantiate the model with something like this:

<Model::MessageQueue>
  base_class NAP::Messaging::Catalyst::MessageQueueAdaptor
  <args>
    <servers>
      hostname localhost
      port     61613
    </servers>
  </args>
</Model::MessageQueue>

The producer/transformer role

Net::Stomp::Producer has a transform_and_send method that allows you to factor into a package the conversion between some internal data and the actual message to send. To tie this transformation into our applications, we use NAP::Messaging::Role::Producer, like this:

package MyApp::Producer::SomeType;
use NAP::policy 'class';
with 'NAP::Messaging::Role::Producer';

sub message_spec { return $some_data_rx_spec }

has '+destination' => ( default => 'SomeDestination' );
has '+type' => ( default => 'SomeType' );

sub transform {
  my ($self,$header,@args) = @_;

  # do something to generate $payload

  return ($header,$payload);
}

which can then be configured like:

<Producer::SomeType>
 <routes_map>
  SomeDestination  /queue/an_actual_queue
 </routes_map>
</Producer::SomeType>

to map the internal destination name to the actual destination. In addition to providing this mapping, the role takes the default message type and (internal) destination name from the attributes, making it easier to find them and to change them (again, via configuration).

Producer/transformer classes are not instantiated by Catalyst, since Net::Stomp::Producer (correctly) does not know anything about them, so those instances wouldn’t get access to the application’s configuration. To work around that, NAP::Messaging::Role::Producer depends on a _global_config attribute being set on instantiation, which is provided by the model adaptor described above.

Running the application

To start our consumer applications we need to extract some information from their configuration, call jms_destinations, instantiate Plack::Handler::Stomp, and start it. Instead we wrote a helper class:

#!perl
use NAP::policy;
use NAP::Messaging::Runner;
NAP::Messaging::Runner->new('MyApp')->run;

The Runner class is quite simple, the code is something like this:

# we will connect to the same servers as the application uses to
# send messages
my $servers = $appclass->model('MessageQueue')
    ->servers;

my @subscriptions = map {; {
    destination => $_,
} } $appclass->jms_destinations;

# now we can build the handler
my $handler = Plack::Handler::Stomp->new({
    %{ $appclass->config->{Stomp} // {} },
    servers => $servers,
    subscriptions => \@subscriptions,
    logger => $appclass->log,
});

$handler->run($appclass->psgi_app);

This allows us to specify the connection information once (in the MessageQueue model config), and also define some connection config for the “consuming” connection:

<Stomp>
 <subscribe_headers>
  activemq.exclusive false
  activemq.prefetchSize 1
 </subscribe_headers>
</Stomp>

exclusive is set to false to allow multiple concurrent consumers (a feature we use to load-balance our consumer applications), and prefetchSize is set to 1 to make sure the load-balancing does the right thing (also because the default, 1000, sometimes creates problems with slow consumers).

Running multiple consumers at once

A subclass of NAP::Messaging::Runner, called NAP::Messaging::MultiRunner, provides a more complex startup mechanism:

  • we can run multiple identical consumer processes
  • we can run multiple processes, each of which consumes from a different subset of destinations
  • a supervisor process restarts children that died for whatever reason

With a configuration like:

<runner>
 <instances>
  destination /queue/one
  destination /queue/two
  instances 5
 </instances>
 <instances>
  destination /queue/three
  instances 8
 </instances>
</runner>

we would start at once 5 identical consumers that only receive messages from queues one and two, and 8 identical consumers that only receive from queue three.

Testing

To test our applications, first of all we want to use a slightly different configuration. Of course it will refer to testing databases, a different logging setup, but also we want our application to run without talking to a real broker. We do it like this:

<Model::MessageQueue>
  base_class NAP::Messaging::Catalyst::MessageQueueAdaptor
  <args>
    <servers>
      hostname localhost
      port     61613
    </servers>
    trace_basedir t/tmp/amq_dump_dir
  </args>
  traits [ +Net::Stomp::MooseHelpers::TraceOnly ]
</Model::MessageQueue>

By applying the TraceOnly role, we make sure that Net::Stomp::Producer will not connect to a broker, but will instead dump the frames that it would have sent into the trace_basedir directory.

Now, we could run this application via Plack::Handler::Stomp::NoNetwork or Test::Plack::Handler::Stomp, but it’s complicated. We use instead Test::NAP::Messaging:

#!perl
use NAP::policy 'test';
use Test::NAP::Messaging;

my ($tester,$app_entry_point) = Test::NAP::Messaging->new_with_app({
    app_class => 'MyApp',
    config_file => 't/lib/myapp.conf',
});

$tester->clear_destination;

my $response = $tester->request(
    $app_entry_point,
    'queue/the_actual_queue_name',
    { count => 13 },
    { type => 'my_message_type' },
);
ok($response->is_success,'message was consumed');

$tester->assert_messages({
    destination => 'queue/the_actual_destination',
    filter_header => superhashof({type => 'my_response'}),
    assert_count => 1,
    assert_body => superhashof({ value => 14 }),
},'reply was sent');

done_testing();

new_with_app loads the application, makes sure that the configuration file sets up the trace_basedir and the TraceOnly role correctly, and builds a tester instance that will use that same directory.

request builds a request from the given message, and runs it through the application, returning an HTTP::Response with whatever the application returned (we are usually only interested in ->is_success).

assert_messages checks the messages dumped for the specified destination, making sure that all those that pass the filters also pass the assert_* (all comparison is done via Test::Deep, which I find to have a rather good balance between complexity and flexibility).

Of course, we could also test the ::Consumer:: components directly, by instantiating them on a mock Catalyst and calling their methods. Some of our applications will quite probably do that, but we didn’t put much support for it in NAP::Messaging because most of the logic is application-specific.

Print Friendly

Leave a Reply