MapReduce::Lite

id:naoyaさんが書いたMapReduce::Liteのコードを読んでみた。
http://d.hatena.ne.jp/naoya/20080511/1210506301
このエントリーにあるapacheアクセスログを解析するサンプルコードを辿っていく。

MapReduce::Lite

mainパッケージから辿っていく。まずはシミュレーションの設定をしていく記述がある。

my $spec = MapReduce::Lite::Spec->new(intermidate_dir => "./tmp");

ここで、オブジェクト生成時に値を設定されているintermidate_dirは、以下のように定義されている。

has intermidate_dir => (
   is => 'rw',
   does => 'Directory',
   coerce => 1,
   required => 1,
);

この値に付いて、強制方変換の設定が「MapReduce::Lite::Types」の中に書かれている。

subtype 'Directory'
   => as 'Object'
   => where { $_->isa('Path::Class::Dir') };

これによってObject型の下位の型として新しくFileという型を作った。File型かどうかのチェック条件は「where { $_->isa('Path::Class::Dir') }」で定義される。次に

coerce 'File'
   => from 'Str'
   => via { Path::Class::File->new($_);

これにより文字列が渡されたときにPath::Class::Fileに変換するという設定をしている。
つまりintermidate_dirに文字列を渡すと、内部で文字列からPath::Class::File型に変換されてintermidate_dirに格納されるということになる。ここでintermidate_dirに設定されるパスは、Mapの結果である中間ファイルを置くディレクトリをどこにするかを指しているものになる。
次にcreate_inputメソッドで、一つの解析対象ファイルに対して「MapReduce::Lite::Spec::Input」を生成する。create_inputメソッドのコードは以下のようになっている。

sub create_input  {
   my $self = shift;
   my $in = MapReduce::Lite::Spec::Input->new(
      intermidate_dir => $self->intermidate_dir
   );
   $self->inputs->push($in);
   return $in;
}

一つの解析対象ファイルに対して「MapReduce::Lite::Spec::Input」を生成して、そのリファレンスをinputsに格納している。生成したオブジェクトのリファレンスを返して、呼び出し元(つまりmainパッケージ内)で、解析対象ファイルのパスとMapを設定している。

for (@ARGV) {
   my $in = $spec->create_input;
   $in->file($_);
   $in->mapper('Analog::Mapper');
}

fileは、intermidate_dirと同じように強制型変換が行なわれている。mapperではmapとしてAnalog::Mapperパッケージが指定されている。mapperにどのパッケージを渡すかによって、どういったmapを行なうかを指定できるようになっている。サンプルコードで指定されているAnalog::Mapperパッケージは、以下のようになっている。

package Analog::Mapper;
use Moose;
with 'MapReduce::Lite::Mapper';

sub map {
   my ($self, $key, $value) = @_;
   my @elements = split /\s+/, $value;
   if ($elements[8]) {
       $self->emit($elements[8], 1);
   }
}

Analog::Mapperは、MapReduce::Lite::Mapperを継承して、mapメソッドを定義している。Analog::Mapperのmapメソッドは、スペース区切りでくるデータを分割して、Apacheステータスコードを取り出す処理を行なっている。取り出されたステータスコードは、

<ステータスコード,1>

という形でemitされる。emitメソッドは、次のようになっている。

sub emit {
   my ($self, $key, $value) = validate_pos(@_,1,1,1);
   my  $id = $self->partitioning_function->($key, $self->num_reducers);
   $self->intermidate_buffers($id)->put($key, $value);
}

3行目で用いられているpartioning_functionには、無名関数のコードリファレンスが格納されている。

has partitioning_function => (
   is => 'rw',
   isa => 'CodeRef',
   lazy => 1,
   default => sub {
      return sub {
         my ($key, $R) = @_;
         length($key) % $R;
      }
   }
}

この関数は「シャッフル」にあたる部分で、$keyの長さをnum_reducersで割った時の余りによってグルーピングしている。partioning_functionからは、グループのidが返され、idに対応するグループに<キー,値>が格納される。格納される際にintermidate_buffersメソッドが呼び出されるが、メソッドフックがかけられている。
intermidate_buffersが呼び出される前に以下のコードが呼び出される。

before 'intermidate_buffers' => sub {
   my ($self, $R) = validate_pos(@_, 1, { type => SCALAR } );
   unless ( $self->_files->[$R]) {
      my $file = $self->intermidate_dir->file( sprintf "$%d.dat", $R );
      my $handle = $file->open('>>');
         or confess sprintf "Can't create an intermidate file: %s", $!;
      $self->files->[$R] = MapReduce::Lite::Mapper::Out->new(
         handle => $handle,
         flash_size => 1024, # FIXME
      );
   }
};

グループ$Rに対応する一時ファイルがないとき、新しく作成するように処理している。intermidate_buffersは、Pass::Class::Fileの参照を返しているので、Rubyライクに連続でputメソッドを呼び出して、<キー,値>で記録している。
 
mainパッケージに戻る。
 
Mapの設定が終わったら今度はReduceの設定が行なわれる。

$spec->out->reducer('Analog::Reducer');

reducerにどのようなパッケージを指定するかによって、どういったReduceを行なうかを設定する事ができるようになっている。サンプルコードでは、Analog::Reducerパッケージが指定されている。Analog::Reducerパッケージは以下のようになっている。

package Analog::Reducer;
use Moose;
with 'MapReduce::Lite::Reducer';

sub reduce {
   my ($self, $key, $value) = @_;
   $self->emit($key, $value->size);
}

Analog::Reducerは「MapReduce::Lite::Reducer」を継承して、reduceメソッドを定義している。Analog::Reduceのreduceは、<キー,値の長さ(1の数)>をemitしている。「MapReduce::Lite::Reducer」のemitは、<キー,値>の形で渡ってきたものを

キー => 値

の形で出力するようになっている。

sub emit {
   my ($self, $key, $value) = validate_pos(@_, 1, 1, 1);
   printf "%s => %s\n", $key, $value;
}

MapReduce::Lite::ReducerはさらにMapReduce::Lite::Role::Emitterを継承している。このパッケージは、emitをインタフェースとしている抽象クラスになっている。
mainパッケージでは、この後

$spec->out->num_tasks(3);

で、何個のReduceを並列で実行するかを指定している。
以上でMapReduceのシミュレーション設定が完了した。$specを渡して次のようにMapReduceを実行する。

MapReduce($spec);

次回、MapReduceの中身について見ていく。