HadoopによるAnalog

http://d.hatena.ne.jp/naoya/20080511/1210506301
こちらのMapReduce::LiteのサンプルプログラムであるAnalogを、
javaで書いてHadoopで実行させてみた。
下記がjavaのソース。力業感たっぷり。

package Sample;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Analog extends Configured implements Tool {
    public static class MapClass extends MapReduceBase
	implements Mapper<LongWritable, Text, Text, IntWritable> {
	
	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();

	public void map(LongWritable key, Text value,
			OutputCollector<Text, IntWritable> output,
			Reporter reporter) throws IOException {
	    String line = value.toString();
	    StringTokenizer itr = new StringTokenizer(line);
	    String tokens[] = new String[9];
	    for (int i = 0; i <= 8 && itr.hasMoreTokens(); i++) {
		tokens[i] = itr.nextToken();
	    }

	    if (tokens[8] != null) {
		word.set(tokens[8]);
		output.collect(word, one);
	    }
	}
    }

    public static class Reduce extends MapReduceBase 
	implements Reducer<Text, IntWritable, Text, IntWritable> {
	
	public void reduce(Text key, Iterator<IntWritable> values,
			   OutputCollector<Text,IntWritable> output,
			   Reporter reporter) throws IOException {
	    int sum = 0;
	    while (values.hasNext()) {
		sum += values.next().get();
	    }
	    output.collect(key, new IntWritable(sum));
	}
    }

    static int printUsage() {
	System.out.println("Analog [-m <maps>] [-r <reduces>] <input> <output>");
	ToolRunner.printGenericCommandUsage(System.out);
	return -1;
    }

    public int run (String[] args) throws Exception {
	JobConf conf = new JobConf(getConf(), Analog.class);
	conf.setJobName("Analog");

	conf.setOutputKeyClass(Text.class);
	conf.setOutputValueClass(IntWritable.class);

	conf.setMapperClass(MapClass.class);
	conf.setCombinerClass(Reduce.class);
	conf.setReducerClass(Reduce.class);

	List<String> other_args = new ArrayList<String>();
	for (int i = 0; i < args.length; ++i) {
	    try {
		if ("-m".equals(args[i])) {
		    conf.setNumMapTasks(Integer.parseInt(args[++i]));
		} else if ("-r".equals(args[i])) {
		    conf.setNumReduceTasks(Integer.parseInt(args[++i]));
		} else {
		    other_args.add(args[i]);
		}
	    } catch (NumberFormatException except) {
		System.out.println("ERROR: Integer expected instead of " + args[i]);
		return printUsage();
	    } catch (ArrayIndexOutOfBoundsException except) {
		System.out.println("ERROR: Required parameter missing from " + args[i-1]);
		return printUsage();
	    }
	}

	if (other_args.size() != 2) {
	    System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 2.");
	    return printUsage();
	}
	FileInputFormat.setInputPaths(conf, other_args.get(0));
	FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
	JobClient.runJob(conf);
	return 0;
    }

    public static void main (String[] args) throws Exception {
	int res = ToolRunner.run(new Configuration(), new Analog(), args);
	System.exit(res);
    }
}

「Sample」ディレクトリを作って、上記のソースを「Analog.java」でそこに保存。
下記の要領で、コンパイルし、解析対象ファイルである「access_log」をHDFSに上げ、
プログラム実行して、結果確認を行なった。

$ javac -cp hadoop-0.17.2.1-core.jar Sample/*.java # コンパイル
$ jar cvf Analog.jar Sample/*.class                # jarファイル作成
$ bin/hadoop dfs -copyFromLocal access_log access_log # HDFSに転送
$ ./bin/hadoop jar Analog.jar Sample.Analog access_log result # Analogを実行
$ ./bin/hadoop dfs -cat result/part-00000                     # 処理結果を確認
200	163
301	2
302	1
404	28
500	2

プログラムの詳細メモはまた明日。

参考サイト

MapReduceのJava実装 Apache Hadoopを使ってみた - コンパイル手順を参考にさせていただきました。