Asynchronous web services with PSGI and IO::Async

Some of our internal web applications are low-traffic enough that a
preforking server can handle them without difficulty. Some others, on
the other hand, can really benefit from a non-blocking implementation,
so that each server process can serve multiple requests “at the same
time”, by working on a request while waiting for other services to
return data for other requests.

Up to now all our non-blocking web services were written for the JVM,
usually with the Akka framework, but I wanted to see how hard would it
be to build something similar in Perl.

Turns out, it’s not very hard at all, if you use the right libraries
and pay attention at a few tricks.

Async libraries in Perl

There are at least 3 well known libraries for asynchronous /
non-blocking programming in Perl:

  • POE
  • AnyEvent
  • IO::Async

POE has been around essentially forever, it’s as stable
as anything can be, but its age means that it has a few idiosyncrasies
that make developing with it less pleasing than it could be.

AnyEvent aims to be “the DBI of event loop
programming”, providing a uniform interface on top of various other
event loops. It’s a good idea, but some of its aspects make me wary of
using it in production (no, I’m not going into further details, I
don’t want to start a flamewar).

Fortunately, I really like IO::Async, it fits with the
way I think, and its integration with the Future module
avoids the “callback hell” of other systems.

Build-your-own web framework, in three simple modules

For many web applications, we use Catalyst. It’s very
powerful, very flexible, and very well supported. But writing async
applications with it is not as straightforward as I’d like. So I
looked to see if I could find something simpler. The basic features of
Catalyst that we use are:

  • a way to load modules and combine them into an application
  • helpers to wrap model and view classes
  • a dispatcher to map requests to subroutines
  • a PSGI interface

The first two features can be replaced with Bread::Board,
a dependency injection or inversion of control container, that
allows easy loading and connection of classes, almost always without
need of special wrappers.

For the dispatcher, I used Router::Simple because it is
as simple as the name implies.

The PSGI interface is something I had to write myself, because I
wanted something non obvious: the application should hook into the
server’s event loop, if it’s using IO::Async, but it
should also run as a normal synchronous application if the server does
not use that library. All without requiring any change to the actual
application code.

What do my controllers look like?

Let’s see an example:

package Example::Controller::Foo;
use NAP::policy 'class';
use NAP::IoC::Component;
use NAP::AsyncAPI::ActionHelpers;
use Future;

has client => (
  is => 'ro',
  required => 1,
  traits => [Wiring],
  wire_to => '/Model/Client',
);

sub foo {
  return Future->$Raw_Request(@_)->then(sub {
    my ($self,$req,$args) = @_;

    return Future->needs_all(
      $self->client->json_request("http://service1/get_something"),
      $self->client->json_request("http://service2/get_other"),
    )->then(sub {
      my ($s1_status,$s1_data,
          $s2_status,$s2_data) = @_;

      return Future->wrap({
        status  => 'ok',
        from_s1 => $s1_data,
        from_s2 => $s2_data,
      });
    })->else(sub {
      return Future->wrap({ status => 'error' });
    })
  })->$JSON_Response;
}

NAP::IoC::Component provides the Wiring
attribute trait, which tells our container (a simple wrapper around
Bread::Board) that some attributes should be
automatically resolved as dependencies. In this case, the
client attribute will be wired to a model that provides
asynchronous HTTP requests and parses the response as JSON.

NAP::AsyncAPI::ActionHelpers provides some coderefs to be
used as methods on Future objects (there may be a better
way to lexically inject methods into a class, but I haven’t found
it). We see two helpers, here: $Raw_Request normalises
the parameters passed to the controller action subroutine, wrapping
them in a Future; $JSON_Response builds a
PSGI response from a data structure, serialising it as JSON.

Essentially, that foo does the following:

  • starts two asynchronous HTTP requests
  • “waits” for both of them to complete successfully
  • builds a data structure from the responses
  • returns it as a JSON-encoded response

The ->else call shows some rudimentary error handling: a more
complete version is not much more complicated.

Making it work

We can load the classes that make up our application, we can call an
action, and we get back a Future that will yield a PSGI
response. How do we actually plug this into a PSGI server? The simple
version of the PSGI entry point looks like this:

my $container = NAP::AsyncAPI::Container->new({
  config_file => $the_config_file,
  loop => IO::Async::Loop->new,
});
my $router = $container->fetch('Router')->get;

sub { # this is the PSGI application coderef
  my ($env) = @_;

  my $given_loop = $env->{'io.async.loop'};
  die "I only run under IO::Async" unless $given_loop;

  my $f = $router->handle_request($env);

  return sub {
    my ($responder) = @_;
    $f->on_done(sub{
      $responder->(@_);
    });
  };
}

In the PSGI coderef, we call our router (which wraps
Router::Simple) to get the Future that will
yield the response, then return a PSGI response in the form of a
coderef: it will be called (by the PSGI server) with a callback to
which we should pass the actual response, and that’s exactly what we
do: when the future is done, we pass its value to the callback.

And that’s enough to make the whole thing work! Note that, since
IO::Async::Loop->new returns a singleton, our container
will run in the same event loop as the server.

If we want to run under servers that don’t use IO::Async,
we need a small modification:

my $loop = IO::Async::Loop->new;
my $container = NAP::AsyncAPI::Container->new({
  config_file => $the_config_file,
  loop => $loop,
});
my $router = $container->fetch('Router')->get;

sub { # this is the PSGI application coderef
  my ($env) = @_;

  my $given_loop = $env->{'io.async.loop'};

  my $f = $router->handle_request($env);

  return sub {
    my ($responder) = @_;
    $f->on_done(sub{
      $responder->(@_);
      if (not $given_loop) {
        $loop->stop;
      }
    });
    if (not $given_loop and not $f->is_done) {
      $loop->run;
    }
  };
}

If we do not receive a loop in the PSGI environment, we start our own
loop when our response coderef is invoked (so that our processing
actually starts), and stop it when the future is done (so that we
don’t just wait forever inside the loop). The additional test
and not $f->is_done makes sure we don’t start the loop if
the future is already done, because in that case the
->on_done coderef has already been invoked, and there’s
nothing else to do.

Print Friendly

Leave a Reply