HadoopによるAnalog2

まず最初に、下記のコードの様に、パッケージの宣言とインポートを行なう。Analogは、Sampleパッケージに所属する。(適切なパッケーイ名の付け方ってあるのかな)

package Sample;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

はじめのインポートブロック部分は、Javaの標準のクラスを読み込んでいて、次のインポートブロックでは、MapReduceに関するクラスを読み込んでいる。
Analogクラスは、org.apache.hadoop.conf.Configuredを継承し、org.apache.hadoop.util.Toolインタフェースを実装する事から始める。

public class Analog extends Configured implements Tool {
       ... code ...
}

Configuredクラスは、特定のXMLファイルから設定パラメータを読み込む役割を担っている。getconf()メソッドを呼び出すと、読み込みが完了する。このメソッドは、org.apache.hadoop.conf.Configurationのインスタンスを返す。このインスタンスは、XMLデータにあるname-valueのペアとして設定されたリソースを持っている。各リソースは、文字列またはorg.apache.hadoop.fs.Pathのインスタンスにより名付けられている。デフォルトでは、クラスパスから順に読み込まれるXMLファイルは下記の2つである。

  • hadoop-default.xml
    • このファイルは、Hadoopのデフォルト設定を含んでいる。例えば全体的な設定、ログ、I/O、ファイルシステムなどのプロパティなど。もしこれらの値を変えたいときは、Hadoop-site.xmlで設定を上書きする。
  • hadoop-site.xml
    • このファイルの設定により、値を設定できる。

リリースは、好きなだけ追加できる。追加していったものは、順に読み込まれる。Hadoop API documentationにaddResource()とaddFinalResourse()が紹介されている。addFinalResource()は、変更できないようなリソースを設定する関数である。
Analogは、toolインタフェースを実装している。このインタフェースは、コマンドラインからのオプション指定を扱う様々なメソッドをサポートしている。このインタフェースを使うと、run()メソッドを書く必要がある。このメソッドは、パラメータとして文字列の並びを引数として取り、Intを返す。Intは、実行が成功したかどうかを表わしている。
run()メソッドを書けば、mainメソッドでは、次のように書ける。

public static void main(String[] args) throws Exception {
       int res = ToolRunner.run(new Configuration(), new Analog(), args);
       System.exit(res);
}

org.apache.hadoop.util.ToolRunnerクラスは、Analog内で定義したrun()メソッドを起動する。ToolRunnerは、Toolインタフェースを実装しているクラスを走らすために使われる。このような仕組みによって、様々な入力オプションを取り扱うカスタムハンドラを書かなくて済む。
次にMapとreduceの記述部分。Analogでは内部で二つのクラスを書いた。下記の二つである。

  • Map
    • キーと値のペアを受け取り、キーと値のペアを1つ以上出力する機能を含む。
  • Reduce
    • 複数のMapから出力結果を集めて、集めたデータを出力する機能を含む。

AnalogのMapは下記のようになっている。

public static class MapClass extends MapReduceBase 
    implements Mapper<LongWritable, Text, Text, IntWritable> {
       public void map(LongWritable key, Text value,
       	              OutputCollector<Text, IntWritable> output,
		      Reporter reporter) throws IOException {
 	   String line = value.toString();
	   StringTokenizer itr = new StringTokenizer(line);
	   String tokens[] = new String[9];
	   for (int i = 0; i <= 8 && itr.hasMoreTokens(); i++) {
	       tokens[i] = itr.nextToken();
	   }

	   if (tokens[8] != null) {
	      word.set(tokens[8]);
	      output.collect(word, one);
	   }
   	}
}

mapはkey-valueペアを受け取る。keyはログファイル名、valueはログファイルの一行が入っている。文字列をスペース区切りでトークンに分け、9番目のトークン(ステータスコード)を取り出す。そして、keyをステータスコードvalueを1としたkey-valueのペアを作成する。OutputCollectorのインスタンスを用いて、作成したkey-valueペアをemitする。

次にReduce。Reduceは下記のようになっている。

public static class Reduce extends MapReduceBase
 implements Reducer<Text, IntWritable, Text, IntWritable> {
       public void reduce(Text key, Iterator<IntWritable> values,
       	      	   	  OutputCollector<Text, IntWritable> output,
			  Reporter reporter) throws IOException {
		int sum = 0;
		while (values.hasNext()) {
		      sum += values.next().get();
		}
		output.collect(key, new IntWritable(sum));
	}
}	      	       

最後に、設定したパラメータを読み込む。このパラメータは、MapReduceフレームワークに、key-valueがそれぞれどのような型になっているのか、MapとReduceすの名前は何か、をフレームワークに知らせる。これは下記のようにrun()メソッドで行なう。

public int run(String[] args) throws Exception {
   JobConf conf = new JobConf(getConf(), Analog.class);
   conf.setJobName("Analog");

まずorg.apache.hadoop.mapred.JobConfのインスタンスを作成する。このインスタンスは、設定を行なうものであり、Configureクラスを継承している。JobConfは、Hadoopフレームワークに実装したMapとReduceを転送する重要な役割を担っている。JobConfに適切な値を渡した後、runJob()メソッドを起動する。これは重要なメソッドでorg.apache.hadoop.mapred.JobTrackerクラスにある。JobClientは内部でorg.apache.hadoop.mapred.JobTrackerクラスとやり取りをしており、進行の追跡、ログのアクセス、クラスタのステータスの取得などの機能を使用できるようにしている。
 
このような形でAnalogをMapReduceアプリケーションとして作成した。

参考サイト

MapReduce programming with Apache Hadoop - このメモを書く際に「Writing a Hadoop MapReduce application」を参考にしました。