Franck Cuny @franckcuny

Software and Operation engineer.

A simple feed aggregator with modern Perl - part 2

I've choose to write about a feed aggregator because it's one of the things I'm working on at RTGI (with web crawler stuffs, gluing datas with search engine, etc)

For the feed aggregator, I will use Moose, KiokuDB and our DBIx::Class schema. Before we get started, I'd would like to give a short introduction to Moose and KiokuDB.

Moose is a "A postmodern object system for Perl 5". Moose brings to OO Perl some really nice concepts like roles, a better syntax, "free" constructor and destructor, ... If you don't already know Moose, check it here for more information.

KiokuDB is a Moose based frontend to various data stores [...] Its purpose is to provide persistence for "regular" objects with as little effort as possible, without sacrificing control over how persistence is actually done, especially for harder to serialize objects. [...] KiokuDB is meant to solve two related persistence problems:

  • Store arbitrary objects without changing their class definitions or worrying about schema details, and without needing to conform to the limitations of a relational model.
  • Persisting arbitrary objects in a way that is compatible with existing data/code (for example interoperating with another app using CouchDB with JSPON semantics).

I will store each feed entry in KiokuDB. I could have chosen to store them as plain text in JSON files, in my DBIx::Class model, etc. But as I want to show you new and modern stuff, I will store them in Kioku using the DBD's backend.

And now for something completely different, code!

First, we will create a base module named MyAggregator.

% module-setup MyAggregator

We will now edit lib/MyAggregator.pm and write the following code:

package MyAggregator;
use Moose;
1;

As you can see, there is no use strict; use warnings here: Moose automatically turns on these pragmas. We don't have to write the new method either, as it's provided by Moose.

For parsing feeds, we will use XML::Feed, and we will use it in a Role. If you don't know what roles are:

Roles have two primary purposes: as interfaces, and as a means of code reuse. Usually, a role encapsulates some piece of behavior or state that can be shared between classes. It is important to understand that roles are not classes. You cannot inherit from a role, and a role cannot be instantiated.

So, we will write our first role, lib/MyAggregator/Roles/Feed.pm:

package MyAggregator::Roles::Feed;
use Moose::Role;
use XML::Feed;
use feature 'say';

sub feed_parser {
    my ($self, $content) = @_;
    my $feed = eval { XML::Feed->parse($content) };
    if ($@) {
        my $error = XML::Feed->errstr || $@;
        say "error while parsing feed : $error";
    }
    $feed;
}
1;

This one is pretty simple. It will read a content, try to parse it, and return a XML::Feed object. If it can't parse the feed, the error will be shown, and the result will be set to undef.

Now, a second role will be used to fetch the feed, and do basic caching, lib/MyAggregator/Roles/UserAgent.pm:

package MyAggregator::Roles::UserAgent;
use Moose::Role;
use LWP::UserAgent;
use Cache::FileCache;
use URI;

has 'ua' => (
    is      => 'ro',
    isa     => 'Object',
    lazy    => 1,
    default => sub { LWP::UserAgent->new(agent => 'MyUberAgent'); }
);
has 'cache' => (
    is   => 'rw',
    isa  => 'Cache::FileCache',
    lazy => 1,
    default =>
        sub { Cache::FileCache->new({namespace => 'myaggregator',}); }
);

sub fetch_feed {
    my ($self, $url) = @_;

    my $req = HTTP::Request->new(GET => URI->new($url));
    my $ref = $self->cache->get($url);
    if (defined $ref && $ref->{LastModified} ne '') {
        $req->header('If-Modified-Since' => $ref->{LastModified});
    }

    my $res = $self->ua->request($req);
    $self->cache->set(
        $url,
        {   ETag         => $res->header('Etag')          || '',
            LastModified => $res->header('Last-Modified') || ''
        },
        '5 days',
    );
    $res;
}
1;

This role has 2 attributes: ua and cache. The ua attribute is our UserAgent. 'lazy' means that it will not be constructed until I call $self->ua->request.

I use Cache::FileCache for doing basic caching so I don't fetch or parse the feed if it's unnecessary, and I use the Etag and Last-Modified header to check the validity of my cache.

The only method of this role is fetch_feed. It will fetch an URL if it's not already in the cache, and return a HTTP::Response object.

Now, I create an Entry class in lib/MyAggregator/Entry.pm:

package MyAggregator::Entry;
use Moose;
use Digest::SHA qw(sha256_hex);
has 'author'  => (is => 'rw', isa => 'Str');
has 'content' => (is => 'rw', isa => 'Str');
has 'title'   => (is => 'rw', isa => 'Str');
has 'id'      => (is => 'rw', isa => 'Str');
has 'date'    => (is => 'rw', isa => 'Object');
has 'permalink' => (
    is       => 'rw',
    isa      => 'Str',
    required => 1,
    trigger  => sub {
        my $self = shift;
        $self->id(sha256_hex $self->permalink);
    }
);
1;

Here the permalink has a trigger attribute: each entry has a unique ID, constructed with a sha256 value from the permalink. So, when we fill the permalink accessor, the ID is automatically set.

We can now change our MyAggregator module like this:

package MyAggregator;
use feature ':5.10';
use MyModel;
use Moose;
use MyAggregator::Entry;
use KiokuDB;
use Digest::SHA qw(sha256_hex);
with 'MyAggregator::Roles::UserAgent', 'MyAggregator::Roles::Feed';

has 'context' => (is => 'ro', isa => 'HashRef');
has 'schema' => (
    is      => 'ro',
    isa     => 'Object',
    lazy    => 1,
    default => sub { MyModel->connect($_[0]->context->{dsn}) },
);
has 'kioku' => (
    is      => 'rw',
    isa     => 'Object',
    lazy    => 1,
    default => sub {
        my $self = shift;
        KiokuDB->connect($self->context->{kioku_dir}, create => 1);
    }
);

sub run {
    my $self = shift;

    my $feeds = $self->schema->resultset('Feed')->search();
    while (my $feed = $feeds->next) {
        my $res = $self->fetch_feed($feed->url);
        if (!$res || !$res->is_success) {
            say "can't fetch " . $feed->url;
        }
        else {
            $self->dedupe_feed($res, $feed->id);
        }
    }
}

sub dedupe_feed {
    my ($self, $res, $feed_id) = @_;

    my $feed = $self->feed_parser(\$res->content);
    return if (!$feed);
    foreach my $entry ($feed->entries) {
        next
            if $self->schema->resultset('Entry')
                ->find(sha256_hex $entry->link);
        my $meme = MyAggregator::Entry->new(
            permalink => $entry->link,
            title     => $entry->title,
            author    => $entry->author,
            date      => $entry->issued,
            content   => $entry->content->body,
        );


        $self->kioku->txn_do(
            scope => 1,
            body  => sub {
                $self->kioku->insert($meme->id => $meme);
            }
        );
        $self->schema->txn_do(
            sub {
                $self->schema->resultset('Entry')->create(
                    {   entryid   => $meme->id,
                        permalink => $meme->permalink,
                        feedid    => $feed_id,
                    }
                );
            }
        );
    }
}
1;
  • the with function composes roles into a class. So my MyAggregator class has a fetch_feed and parse_feed methods, and all the attributes of our roles
  • context is a HashRef that contains the configuration
  • schema is our MyModel schema
    • kioku is a connection to our kiokudb backend

Two methods in this object: run and dedupe.

The run method gets the list of feeds (line 28, via the search). For each feed return by the search, we try to fetch it, and if it's successful, we dedupe the entries. To dedupe the entries, we check if the permalink is alread in the database (line 45, via the find). If we already have this entry, we skip this one, and do the next one. If it's a new entry, we create a MyAggregator::Entry object, with the content, date, title, ... we store this object in kiokudb (line 55, we create a transaction, and do our insertion in the transaction), and create a new entry in the MyModel database (line 61, we enter in transaction too, and insert the entry in the database).

And to run this, a little script:

use strict;
use MyAggregator;
use YAML::Syck;
my $agg = MyAggregator->new(context => LoadFile shift);
$agg->run;

so we can run our aggregator like this perl bin/aggregator.pl conf.yaml

And it's done :) We got a really basic aggregator now. If you want to improve this one, you would like to improve the dedupe process, using the permalink, the date and/or the title, as this one is too much basic. In the next article we will write some tests for this aggregator using Test::Class.

big thanks to tea and blob for reviewing and fixing my broken english in the first 2 parts.

The code is available on git server.

Part 3 and 4 next week.