Posts Tagged ‘Tatsumaki’

presque, a Redis / Tatsumaki based message queue

Wednesday, April 14th, 2010

presque is a small message queue service build on top of redis and Tatsumaki. It’s heavily inspired by RestMQ for the functionalities and resque for the name.

  • Communications are done in JSON over HTTP
  • Queues and messages are organized as REST resources
  • A worker can be writen in any language that make a HTTP request and read JSON
  • Thanks to redis, the queues are persistent

Overview

resque need a configuration file, writen in YAML that contains the host and port for the Redis server.

redis:
  host: 127.0.0.1
  port: 6379

Let’s start the server:

$ plackup app.psgi --port 5000

The applications provides some HTTP routes:

  • /: a basic HTML page with some information about the queues
  • /q/: REST API to get and post job to a queue
  • /j/: REST API to get some information about a queue
  • /control/: REST API to control a queue (start or stop consumers)
  • /stats/: REST API to fetch some stats (displayed on the index page)

Queues are created on the fly, when a job for an unknown queue is inserted. When a new job is created, the JSON send in the POST will be stored “as is”. There is no restriction on the schema or the content of the JSON.

Creating a new job simply consist to :

curl -X POST "http://localhost:5000/q/foo" -d '{"foo":"bar", "foo2":"bar" }'

and fetching the job:

curl "http://localhost:5000/q/foo"

When a job is fetched, it’s removed from the queue.

A basic worker

I’ve also uploaded presque::worker to github. It’s based on AnyEvent::HTTP and Moose. Let’s write a basic worker using this class:

use strict;
use warnings;
use 5.012; # w00t
 
package simple::worker;
use Moose;
extends 'presque::worker';
 
sub work {
    my ($self, $job) = @_;
    say "job's done";
    ...; # yadda yadda!
    return;
}
 
package main;
use AnyEvent;
 
my $worker = simple::worker->new(base_uri => 'http://localhost:5000', queue => 'foo');
 
AnyEvent->condvar->recv;

A worker have to extends the presque::worker class, and implement the method work. When the object is created, the class check if this method is avalaible. You can also provide a fail method, which will be called when an error occur.

The future

I plan to add support for websocket, and probably XMPP. More functionalities to the worker too: logging, forking, handling many queues, … I would like to add priorities to queue also, and maybe scheluding job for a given date (not sure if it’s feasable with Redis).

More fun with Tatsumaki and Plack

Saturday, April 3rd, 2010

Lately I’ve been toying a lot with Plack and two Perl web framework: Tatsumaki and Dancer. I use both of them for different purposes, as their features complete each other.

Plack

If you don’t already know what Plack is, you would want to take a look at the following Plack resources:

As sukria is planning to talk about Dancer during the FPW 2010, I will probably do a talk about Plack.

After reading some code, I’ve started to write two middleware: the first one add ETag header to the HTTP response, and the second one provides a way to limit access to your application.

Plack::Middleware::ETag

This middleware is really simple: for each request, an ETag header is added to the response. The ETag value is a sha1 of the response’s content. In case the content is a file, it works like apache, using various information from the file: inode, modified time and size. This middleware can be used with Plack::Middleware::ConditionalGET, so the client will have the ETag information for the page, and when he will do a request next time, it will send an “if-modified” header. If the ETag is the same, a 304 response will be send, meaning the content have not been modified. This module is available on CPAN.

Let’s see how it works. First, we create a really simple application (we call it app.psgi):

#!/usr/bin/env perl
use strict;
use warnings;
use Plack::Builder;
 
builder {
    enable "Plack::Middleware::ConditionalGET";
    enable "Plack::Middleware::ETag";
    sub {
        [ '200', [ 'Content-Type' => 'text/html' ], ['Hello world'] ];
    };
};

Now we can test it:

> plackup app.psgi&
 
> curl -D - http://localhost:5000
HTTP/1.0 200 OK
Date: Sat, 03 Apr 2010 09:31:43 GMT
Server: HTTP::Server::PSGI
Content-Type: text/html
ETag: 7b502c3a1f48c8609ae212cdfb639dee39673f5e
Content-Length: 11
 
> curl -H "If-None-Match: 7b502c3a1f48c8609ae212cdfb639dee39673f5e" -D - http://localhost:5000
HTTP/1.0 304 Not Modified
Date: Sat, 03 Apr 2010 09:31:45 GMT
Server: HTTP::Server::PSGI
ETag: 7b502c3a1f48c8609ae212cdfb639dee39673f5e

Plack::Middleware::Throttle

With this middleware, you can control how many times you want to provide an access to your application. This module is not yet on CPAN, has I want to add some features, but you can get the code on github. There is four methods to control access:

  • Plack::Middleware::Throttle::Hourly: how many times in one hour someone can access the application
  • P::M::T::Daily: the same, but for a day
  • P::M::T::Interval: which interval the client must wait between two query
  • by combining the three previous methods

To store sessions informations, you can use any cache backend that provides get, set and incr methods. By default, if no backend is provided, it will store informations in a hash. You can easily modify the defaults throttling strategies by subclassing all the classes.

Let’s write another application to test it:

#!/usr/bin/env perl
use strict;
use warnings;
use Plack::Builder;
 
builder {
    enable "Plack::Middleware::Throttle::Hourly", max => 2;
    sub {
        [ '200', [ 'Content-Type' => 'text/html' ], ['Hello world'] ];
    };
};
$ curl -D - http://localhost:5000/
HTTP/1.0 200 OK
Date: Sat, 03 Apr 2010 09:57:40 GMT
Server: HTTP::Server::PSGI
Content-Type: text/html
X-RateLimit-Limit: 2
X-RateLimit-Remaining: 1
X-RateLimit-Reset: 140
Content-Length: 11
 
Hello world
 
$ curl -D - http://localhost:5000/
HTTP/1.0 200 OK
Date: Sat, 03 Apr 2010 09:57:40 GMT
Server: HTTP::Server::PSGI
Content-Type: text/html
X-RateLimit-Limit: 2
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 140
Content-Length: 11
 
Hello world
 
$ curl -D - http://localhost:5000/
HTTP/1.0 503 Service Unavailable
Date: Sat, 03 Apr 2010 09:57:41 GMT
Server: HTTP::Server::PSGI
Content-Type: text/plain
X-RateLimit-Reset: 139
Content-Length: 15
 
Over rate limit

Some HTTP headers are added to the response :

  • X-RateLimit-Limit: how many request can be done
  • X-RateLimit-Remaining: how many requests are available
  • X-RateLimit-Reset: when will the counter be reseted (in seconds)

This middleware could be a very good companion to the Dancer REST stuff added recently.

another Tatsumaki application with Plack middlewares

To demonstrate the use of this two middleware, I’ve wrote a small application with Tatsumaki. This application fetch a page, parse it to find all the feeds declared, and return a JSON with the result.

GET http://feeddiscover.tirnan0g.org/?url=http://lumberjaph.net/blog/

will return

[{"href":"http://lumberjaph.net/blog/index.php/feed/","type":"application/rss+xml","title":"iâm a lumberjaph RSS Feed"}]

This application is composed of one handler, that handle only GET request. The request will fetch the url given in the url parameter, scrap the content to find the links to feeds, and cache the result with Redis. The response is a JSON string with the informations.

The interesting part is the app.psgi file:

 
my $app = Tatsumaki::Application->new( [ '/' => 'FeedDiscovery::Handler' ], );
 
builder {
    enable "Plack::Middleware::ConditionalGET";
    enable "Plack::Middleware::ETag";
    enable "Plack::Middleware::Throttle::Hourly",
        backend => Redis->new(
        server => '127.0.0.1:6379',
        ),
        max => 100;
    $app;
};

The application itself is really simple: for a given url, the Tatsumaki::HTTPClient fetch an url, I use Web::Scraper to find the link rel=”alternate” from the page, if something is found, it’s stored in Redis, then a JSON string is returned to the client.

Tatsumaki, or how to write a nice webapp in less than two hours

Monday, December 21st, 2009

Until today, I had a script named “lifestream.pl”. This script was triggered via cron once every hour, to fetch various feeds from services I use (like github, identi.ca, …) and to process the result through a template and dump the result in a HTML file.

Today I was reading Tatsumaki’s code and some examples (Social and Subfeedr). Tatsumaki is a “port” tornado (a non blocking server in Python), based on Plack and AnyEvent. I though that using this to replace my old lifestream script would be a good way to test it. Two hours later I have a complete webapp that works (and the code is available here).

The code is really simple: first, I define an handler for my HTTP request. As I have only one things to do (display entries), the handler is really simple:

package Lifestream::Handler;   
use Moose;                     
extends 'Tatsumaki::Handler';  
 
sub get {                      
    my $self = shift;          
    my %params = %{$self->request->params};
    $self->render( 'lifestream.html', {
        memes    => $self->application->memes($params{page}),
        services => $self->application->services
    });
}
1;

For all the get request, 2 methods are called : memes and services. The memes get a list of memes to display on the page. The services get the list of the various services I use (to display them on a sidebar).

Now, as I don’t want to have anymore my lifestream.pl script in cron, I will let Tatsumaki do the polling. For this, I add a service to my app, which is just a worker.

package Lifestream::Worker;    
use Moose;                     
extends 'Tatsumaki::Service';  
use Tatsumaki::HTTPClient;     
...
sub start {
    my $self = shift;
    my $t; $t = AE::timer 0, 1800, sub {
        scalar $t;
        $self->fetch_feeds;
    };
}
....
sub fetch_feeds {
    my ($self, $url) = @_;
    Tatsumaki::HTTPClient->new->get( $url, sub { #do the fetch and parsing stuff });
}

From now, every 60 minutes, feeds will be checked. Tatsumaki::HTTPClient is a HTTP client based on AnyEvent::HTTP.

Let’s write the app now

package Lifestream;            
 
use Moose;
extends "Tatsumaki::Application";
 
use Lifestream::Handler;       
use Lifestream::Worker;        
...
sub app {
    my ( $class, %args ) = @_;
    my $self = $class->new( [ '/' => 'Lifestream::Handler', ] );
    $self->config( $args{config} ); 
    $self->add_service( Lifestream::Worker->new( config => $self->config ) );
    $self;
}
...
sub memes {
...
}
 
sub services {
....
}

The memes and services method called from the handler are defined here. In the app method, I “attch” the “/” path to the handler, and I add the service.

and to launch the app

my $app = Lifestream->app( config => LoadFile($config) );
require Tatsumaki::Server;      
Tatsumaki::Server->new(
    port => 9999,
    host => 0,
)->run($app);

And that’s it, I now have a nice webapp, with something like only 200 LOC. I will keep playing with Tatsumaki as I have more ideas (and probably subfeedr too). Thanks to miyagawa for all this code.