HadoopによるAnalog
http://d.hatena.ne.jp/naoya/20080511/1210506301
こちらのMapReduce::LiteのサンプルプログラムであるAnalogを、
javaで書いてHadoopで実行させてみた。
下記がjavaのソース。力業感たっぷり。
package Sample; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Analog extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); String tokens[] = new String[9]; for (int i = 0; i <= 8 && itr.hasMoreTokens(); i++) { tokens[i] = itr.nextToken(); } if (tokens[8] != null) { word.set(tokens[8]); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } static int printUsage() { System.out.println("Analog [-m <maps>] [-r <reduces>] <input> <output>"); ToolRunner.printGenericCommandUsage(System.out); return -1; } public int run (String[] args) throws Exception { JobConf conf = new JobConf(getConf(), Analog.class); conf.setJobName("Analog"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); List<String> other_args = new ArrayList<String>(); for (int i = 0; i < args.length; ++i) { try { if ("-m".equals(args[i])) { conf.setNumMapTasks(Integer.parseInt(args[++i])); } else if ("-r".equals(args[i])) { conf.setNumReduceTasks(Integer.parseInt(args[++i])); } else { other_args.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i-1]); return printUsage(); } } if (other_args.size() != 2) { System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 2."); return printUsage(); } FileInputFormat.setInputPaths(conf, other_args.get(0)); FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); JobClient.runJob(conf); return 0; } public static void main (String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Analog(), args); System.exit(res); } }
「Sample」ディレクトリを作って、上記のソースを「Analog.java」でそこに保存。
下記の要領で、コンパイルし、解析対象ファイルである「access_log」をHDFSに上げ、
プログラム実行して、結果確認を行なった。
$ javac -cp hadoop-0.17.2.1-core.jar Sample/*.java # コンパイル $ jar cvf Analog.jar Sample/*.class # jarファイル作成 $ bin/hadoop dfs -copyFromLocal access_log access_log # HDFSに転送 $ ./bin/hadoop jar Analog.jar Sample.Analog access_log result # Analogを実行 $ ./bin/hadoop dfs -cat result/part-00000 # 処理結果を確認 200 163 301 2 302 1 404 28 500 2
プログラムの詳細メモはまた明日。
参考サイト
MapReduceのJava実装 Apache Hadoopを使ってみた - コンパイル手順を参考にさせていただきました。