MapReduce::Lite Part.2

前回

うお、リンク貼ると自動的にトラックバックを送信するようになってたのか。恐れ多い事してもうた。

mapreduce

mapreduceメソッドの中では、do_map,do_reduceが順に呼び出されている。

do_map( $spec );
do_reduce( $spec );

これらのメソッドを順に見ていく。

do_map

MapReduce::Liteでは、分散処理を実際に行なうのではなく、スレッドでシミュレーションしている。do_mapでは、まずスレッド間で共用するキューを次のようにして生成する。

my $quere = Thread::Queue::Any->new;

このキューには、ファイルの一行一行が格納されていく。実際のコードは以下のようになっている。

for my $in (@{$spec->inputs}) {
   my $iter = $in->iterator;
   while ( $iter->has_next ) {
      $queue->enqueue([ $in->file => $iter->next ]);
   }

$in->iteratorは、MapReduce::Lite::FileIteratorを生成して、そのリファレンスを返す。MapReduce::Lite::FileIteratorでは、has_nextとnextを定義している。has_nextは、ハンドルがEOFを指しているかどうかで真偽を返している。

sub has_next {
   shift->handle->eof ? 0 : 1;
}

nextは、ファイルの一行を読み込んで、改行を取り除いたものを返す。

sub next {
   my $line = shift->handle->getline;
   chomp $line;
   $line;
}

キューには、<ファイル名,一行のデータ>の形式でエンキューされていく。

$queue->enqueue([ $in->file => $iter->next ]);

ここに入ったデータが複数のスレッドに取り出されていき、処理される。
mapperには、シャッフル時に使用するReducerの数がセットされる。

$in->mapper->num_reducers( $spec->out->num_tasks );

$spec->out->num_tasksには、mainパッケージで3にセットされている。次にスレッドが起動されていく。

for (my $i = 0; $i < $spec->num_threads; $i++) {
   threads->create(map_thread => $queue, $in->mapper);
}
$_->join for threads->list;

スレッドは、$spec->num_threads個だけ起動する。このサンプルコードでは、特に設定されていないので、デフォルトの5になっている。
スレッド起動時には、エントリポイントとしてmap_threadメソッドが指定される。引数は($queue, $in->mapper)となっている。
map_threadでは、$queueからデータを取り出し、mapper(Analog::Mapper)にデータを渡している。これは各スレッドで、キューが空になるまで繰り返される。

while(my $left = $queue->pending) {
   my ($task) = $queue->dequeue;
   $mapper->map(@$task);
}

キューが空になるとそれぞれのスレッドで、$mapper->doneが呼び出されてファイルがクローズされる。
呼び出し元(do_map)では

$_->join for threads->list;

で、スレッドがすべて終了するまで待機している。mapの出力結果は「./tmp」に「R(id).dat」の形で一時的に保存される。

do_reduce

do_reduceでは、まずmapの出力ファイルを読み込む。出力ファイルの読み込みは、MapReduce::Lite::Conduitを用いて行なわれる。

my $conduit = MapReduce::Lite::Conduit->new(
   intermidate_dir => $spec->intermidate_dir,
);
$conduit->consume($r);

consumeメソッドで実際にファイル読み込みを行なっている。以下は、渡されたグループidを元に、対応しているファイルをオープンしている部分になる。

sub consume {
   my ($self, $id) = validate_pos(@_, 1, { type => SCALAR });
  
   my $file = $self->intermidate_dir->file(sprintf "R%d.dat", $id);
   my $handle = $file->open('r') or return;

オープンされたファイルのハンドルは、MapReduce::Lite::FileIteratorに渡されて、このパッケージを介してファイルの中身を読み込まれる。

my $iter = MapReduce::Lite::FileIterator->new(handle => $handle);
while ($iter->has_next) {
   $self->csv->parse( $iter->next );
   $self->put( $self->csv->fields );
}

データは、%MapReduce::Lite::FileIterator::dataに読み込まれる。読み込みが終了すると、一時ファイルは削除される。

$file->remove or die "Can't remove %s: %s", $file, $!;

do_reduceでは、ここで読み込まれたデータが、スレッド間共用のキューに格納される。

my $queue = Thread::Queue::Any->new;
my $iter = $conduit->iterator;
while( $iter->has_next ) {
   $queue->enqueue([ $iter->next ]);
}

そして、mapのときと同じようにスレッドが起動されてReduceが行なわれる。

for(my $i = 0; $i < $spec->num_threads; $i++) {
   threads->create(reduce_thread => $queue, $spec->out->reducer);
}
sub reduce_thread {
   my ($queue, $reducer) = validate_pos(@_, 1, 1);

   while (my $left = $queue->pending) {
       my ($task) = $queue->dequeue;
       $reducer->reduce(@$task);
   }
}

以上がMapReduce::Liteの流れ。

参考文献