HadoopによるAnalog

http://d.hatena.ne.jp/naoya/20080511/1210506301
こちらのMapReduce::LiteのサンプルプログラムであるAnalogを、
javaで書いてHadoopで実行させてみた。
下記がjavaのソース。力業感たっぷり。

package Sample;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Analog extends Configured implements Tool {
    public static class MapClass extends MapReduceBase
	implements Mapper<LongWritable, Text, Text, IntWritable> {
	
	private final static IntWritable one = new IntWritable(1);
	private Text word = new Text();

	public void map(LongWritable key, Text value,
			OutputCollector<Text, IntWritable> output,
			Reporter reporter) throws IOException {
	    String line = value.toString();
	    StringTokenizer itr = new StringTokenizer(line);
	    String tokens[] = new String[9];
	    for (int i = 0; i <= 8 && itr.hasMoreTokens(); i++) {
		tokens[i] = itr.nextToken();
	    }

	    if (tokens[8] != null) {
		word.set(tokens[8]);
		output.collect(word, one);
	    }
	}
    }

    public static class Reduce extends MapReduceBase 
	implements Reducer<Text, IntWritable, Text, IntWritable> {
	
	public void reduce(Text key, Iterator<IntWritable> values,
			   OutputCollector<Text,IntWritable> output,
			   Reporter reporter) throws IOException {
	    int sum = 0;
	    while (values.hasNext()) {
		sum += values.next().get();
	    }
	    output.collect(key, new IntWritable(sum));
	}
    }

    static int printUsage() {
	System.out.println("Analog [-m <maps>] [-r <reduces>] <input> <output>");
	ToolRunner.printGenericCommandUsage(System.out);
	return -1;
    }

    public int run (String[] args) throws Exception {
	JobConf conf = new JobConf(getConf(), Analog.class);
	conf.setJobName("Analog");

	conf.setOutputKeyClass(Text.class);
	conf.setOutputValueClass(IntWritable.class);

	conf.setMapperClass(MapClass.class);
	conf.setCombinerClass(Reduce.class);
	conf.setReducerClass(Reduce.class);

	List<String> other_args = new ArrayList<String>();
	for (int i = 0; i < args.length; ++i) {
	    try {
		if ("-m".equals(args[i])) {
		    conf.setNumMapTasks(Integer.parseInt(args[++i]));
		} else if ("-r".equals(args[i])) {
		    conf.setNumReduceTasks(Integer.parseInt(args[++i]));
		} else {
		    other_args.add(args[i]);
		}
	    } catch (NumberFormatException except) {
		System.out.println("ERROR: Integer expected instead of " + args[i]);
		return printUsage();
	    } catch (ArrayIndexOutOfBoundsException except) {
		System.out.println("ERROR: Required parameter missing from " + args[i-1]);
		return printUsage();
	    }
	}

	if (other_args.size() != 2) {
	    System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 2.");
	    return printUsage();
	}
	FileInputFormat.setInputPaths(conf, other_args.get(0));
	FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
	JobClient.runJob(conf);
	return 0;
    }

    public static void main (String[] args) throws Exception {
	int res = ToolRunner.run(new Configuration(), new Analog(), args);
	System.exit(res);
    }
}

「Sample」ディレクトリを作って、上記のソースを「Analog.java」でそこに保存。
下記の要領で、コンパイルし、解析対象ファイルである「access_log」をHDFSに上げ、
プログラム実行して、結果確認を行なった。

$ javac -cp hadoop-0.17.2.1-core.jar Sample/*.java # コンパイル
$ jar cvf Analog.jar Sample/*.class                # jarファイル作成
$ bin/hadoop dfs -copyFromLocal access_log access_log # HDFSに転送
$ ./bin/hadoop jar Analog.jar Sample.Analog access_log result # Analogを実行
$ ./bin/hadoop dfs -cat result/part-00000                     # 処理結果を確認
200	163
301	2
302	1
404	28
500	2

プログラムの詳細メモはまた明日。

参考サイト

MapReduceのJava実装 Apache Hadoopを使ってみた - コンパイル手順を参考にさせていただきました。

MapReduce::LiteによるWordCount

http://codezine.jp/article/detail/2485
こちらのサイトで紹介されているHadoopのサンプルプログラム
WordCount(WordCount.java)と同じ事を行なうものを、
MapReduce::Liteで書いてみた。
WordCountはスペース区切りで並べられた単語を数えるプログラムである。

入力データ:
hoge hoge hoge fuga fuga hoge uho
 

出力データ:
uho => 1
fuga => 2
hoge => 4

まずマップを記述する。

package WordCount::Mapper;
use Moose;
with 'MapReduce::Lite::Mapper';

sub map {
    my ($self, $key, $value) = @_;
    my @elements = split /\s+/, $value;
    for my $element (@elements) {
         $self->emit($element, 1);
    }
}

MapReduce::Lite::Mapperを継承し、新たにWordCount::Mapperを作成する。
map関数をオーバライドして、スペース区切りのデータを切り分け、
それぞれの単語を「<単語, 1>」の形でemitするようにする。
この後、reducerに渡される前にシャッフルという段階があり、処理される。
この処理が行なわれた後、リストは各単語毎に「<単語, (1, 1, ...)>」という形になる。
次にこのリストを受け取るReducerを作成する。

package WordCount::Reducer;
use Moose;
with 'MapReduce::Lite::Reducer';

sub reduce { 
    my ($self, $key, $value) = @_;
    $self->emit($key, $value->size);
}

MapReduce::Lite::Reducerを継承し、新たにWordCount::Reducerを作成する。
reduce関数をオーバライドして、「<単語、(1, 1, ....)>」の形で渡ってくるリストを受け取り、
(1, 1, ...)の部分の長さを求めるようにする。
実質、(1, 1, ...)の長さは、単語の出現回数となる。

これらのMapReduceを実行するプログラムは下記の通り。
 

package main;
use FindBin::lib;
use MapReduce::Lite;

my $spec = MapReduce::Lite::Spec->new(intermidate_dir => "./tmp");

for (@ARGV) {
    my $in = $spec->create_input;
    $in->file($_);
    $in->mapper('WordCount::Mapper');
}

$spec->out->reducer('WordCount::Reducer');
$spec->out->num_tasks(1);

mapreduce($spec);

 
中間ファイルを置くディレクトリ「tmp」を作成し、単語リストが書かれたファイル「./inputs/file1」を読み込ませて、プログラムを実行。

$ mkdir tmp
$ ./WordCount.pl ./inputs/file1
uho => 1
fuga => 2
hoge => 4

GFSについての箇条書きメモ

  • Google File System
  • Google独自の分散ファイルシステム
  • 多数のマシンを組み合わせて巨大なストレージを作る
  • 複数のマシンが動作する事で効率的なデータ転送
  • 膨大なデータの通り道
  • 扱われるファイルが巨大
  • 新しいデータをどんどん書き加えるか、読み出し続けるか
  • GFS(データの読み出し)→アプリケーション(データ加工)→GFS(データ書き込み)
  • データ転送に特化される設計
  • GFS上では、ファイルは常にバックアップされた状態
  • 大きなファイル(数百MB)を一気に流し込む
  • データを書き換えしない
  • ファイルロック(排他制御)を行なわない
    • ファイルをキューとして用いる
  • ファイル操作のためのインタフェース
    • Create (作成)
    • Delete (削除)
    • Open (オープン)
    • Close (クローズ)
    • Read (読み込み)
    • Write (書き込み)
    • Snapshot (スナップショット)
      • ファイルの複製
    • Record Append (レコード追加)
      • ひとまとまりのデータを最後に追加
  • ファイルは自動的に複製される
  • マスタ(Master)、チャンクサーバ(Chunk Server)、クライアント(Client)の3つの要素で構成される
    • Master・・・GFS全体の状態をコントロールする中央サーバ
    • Chunk Server・・・ハードディスクへの入出力を担当
    • Client・・・ファイルを読み書きするアプリケーション
  • ファイルはチャンク(64MB)の単位に分割される
  • チャンクは通常3つのチャンクサーバにコピーされる
  • マスタは、どのチャンクサーバにどのチャンクがあるのかを把握している
  • クライアントはまずマスタに問い合わせる
  • マスタは、チャンクサーバの情報をクライアントに送る
  • 以後、クライアントとチャンクサーバ間で読み書きが行なわれる
  • 読み込みは最寄りのサーバから
    • IPアドレスが距離に応じて割り当てられている
    • 負荷によっては、マスタはチャンクのコピーを増やす
  • 書き込みは複数サーバへ
    • マスタは、まとめ役のチャンクサーバを決定する(プライマリの決定)
    • クライアントには、プライマリなチャンクサーバを伝えられる
    • クライアントは、要求するチャンクをチャンクサーバに送る(プライマリに限らない)
    • バケツリレーの様にチャンクサーバ間でチャンクをコピー
    • データ送信終了後、クライアントはプライマリにデータ書き込みを要求
    • プライマリは、手元のチャンクを書き込み、セカンダリに書き込み要求
    • 書き込み終了後、クライアントに通知
  • 様々なエラーへの対応
    • チャンクサーバの故障
    • ハードディスクの問題
  • プライマリが失敗したとき
    • クライアントがはじめからやり直す
    • マスタが新しいプライマリを決定する
  • セカンダリが失敗のとき
    • プライマリとセカンダリでチャンク内容が異なる状況
    • チャンクにつけたシリアルナンバーをマスタに通知
    • 古いデータを読み込んでしまう様な、読み書きを同時に行なうアプリケーションは、「レコード追加」で対処
  • 同時書き込みで不整合が起こる
  • レコード追加によるアトミックな書き込み
  • レコード追加は、中断されることなく一度に行なわれる(アトミック)
  • レコードは、ファイルの末尾に追加されていく
  • レコードは、一回以上書き込まれる事を保証している
  • 書き込みに失敗した場合、もう一度ファイルの末尾に新しい領域を確保するところからやり直し
  • 読み出し側で失敗した箇所を読み飛ばす
  • スナップショットはコピーオンライトで高速化
    • ファイルの情報(チャンクの場所など)を持っているマスタの中で、ファイル情報をコピー
    • チャンクの内容を書き換えるタイミングでチャンクがコピーされる(コピーライトオン)
  • マスタの持つ情報
    • ファイル名とそれを構成するチャンクリスト
    • チャンクサーバがどこにあり、今どのような状態か
    • どのチャンクサーバがどのチャンクを持っているか
  • マスタはGFS全体を最適化する
  • チャンクの障害対策
  • マスタの障害対策として、オペレーションログを記録している
  • データ管理の基盤として動く

Hadoopインストールメモ

Javaのインストール

http://java.sun.com/javase/downloads/index.jspで、
Java SE Development Kit (JDK) 6 Update 10」をダウンロードリンクを辿る。
「j2sdk-1_4_2_18-linux-i586-rpm.bin」をダウンロード。
以下のようにコマンドを打ってインストール。

$ chmod +x j2sdk-1_4_2_18-linux-i586-rpm.bin
$ su
# rpm -ivh j2sdk-1_4_2_18-linux-i586.rpm

Hadoopのインストール

http://hadoop.apache.org/core/releases.html#Download
上記のURLからHadoopをダウンロード。
ファイル名は「hadoop-[VERSION].tar.gz」。
今回、ダウンロードしたのは「hadoop-0.17.2.1.tar.gz」。
その後、解凍して、ディレクトリ名を「Hadoop」変更。

$ tar -xvzf hadoop-0.17.2.1.tar.gz
$ mv hadoop-0.17.2.1 hadoop

 

Hadoopの設定

次に「[HADOOP INSTALL]/conf/hadoop-env.sh」を編集する。
JAVA_HOME環境変数設定する。上記の方法でインストールした場合、
「/usr/java/jdk1.6.0_10」
と設定。
 
 
次に「[HADOOP INSTALL]/conf/hadoop-site.xml」を編集する。(ファイルがなければ、hadoop-default.xmlをコピーしてくる)。
下記のように変更した。


hadoop.tmp.dir
/tmp/hadoop-${user.name}


fs.default.name
hdfs://localhost:54310


mapred.job.tracker
localhost:54311


dfs.replication
1

Hadoopのフォーマット、起動、停止

以下のコマンドでフォーマットと起動と停止を行なえる。

$ cd hadoop
$ ./bin/hadoop namenode -format # フォーマット
$ ./bin/start-all.sh # 起動
$ ./bin/stop-all.sh # 停止

参考サイト

gitの使い方メモ2

プログラム群をgitの管理下に置く

Railsでプログラムを作成。

$ rails myproject
      create  
      create  app/controllers
      create  app/helpers
      create  app/models
      create  app/views/layouts
      create  config/environments
      create  config/initializers
      create  db
      create  doc
      create  lib
      create  lib/tasks
      create  log
      create  public/images
      create  public/javascripts
      create  public/stylesheets
      create  script/performance
      create  script/process
      create  test/fixtures
      create  test/functional
      create  test/integration
      create  test/unit
      create  vendor
      create  vendor/plugins
      create  tmp/sessions
      create  tmp/sockets
      create  tmp/cache
      create  tmp/pids
      create  Rakefile
      create  README
      create  app/controllers/application.rb
      create  app/helpers/application_helper.rb
      create  test/test_helper.rb
      create  config/database.yml
      create  config/routes.rb
      create  config/initializers/inflections.rb
      create  config/initializers/mime_types.rb
      create  config/initializers/new_rails_defaults.rb
      create  config/boot.rb
      create  config/environment.rb
      create  config/environments/production.rb
      create  config/environments/development.rb
      create  config/environments/test.rb
      create  script/about
      create  script/console
      create  script/dbconsole
      create  script/destroy
      create  script/generate
      create  script/performance/benchmarker
      create  script/performance/profiler
      create  script/performance/request
      create  script/process/reaper
      create  script/process/spawner
      create  script/process/inspector
      create  script/runner
      create  script/server
      create  script/plugin
      create  public/dispatch.rb
      create  public/dispatch.cgi
      create  public/dispatch.fcgi
      create  public/404.html
      create  public/422.html
      create  public/500.html
      create  public/index.html
      create  public/favicon.ico
      create  public/robots.txt
      create  public/images/rails.png
      create  public/javascripts/prototype.js
      create  public/javascripts/effects.js
      create  public/javascripts/dragdrop.js
      create  public/javascripts/controls.js
      create  public/javascripts/application.js
      create  doc/README_FOR_APP
      create  log/server.log
      create  log/production.log
      create  log/development.log
      create  log/test.log
$ cd myproject/
README		config		lib		script		vendor
Rakefile	db		log		test
app		doc		public		tmp

作成したファイルをgitで管理するように設定。
myprojectディレクト上で次のコマンドを打ち込む。

$ git init
$ git add .
$ git commit

次のコマンドでコミットの履歴が見られる。

$ git log
commit 1e2581835d4a1e09f51fece88de4d14f42c6a73e
Author: hato_mune <hatomune@gmail.com>
Date:   Tue Oct 7 23:21:46 2008 +0900

    Initial commit

gitの使い方メモ

gitの管理対象のファイルを作る。

myprojectディレクトリを作る。

$ mkdir myproject
$ cd myproject

「uhouho」という文字列が記述されたファイル「test.txt」を作る。

$ echo uhouho > test.txt

gitの初期設定

氏名とメールアドレスを設定する。

$ git config --global user.name "hato_mune"
$ git config --global user.email "hato_mune@gmail.com

リポジトリを作成する。

$ cd myproject
$ git init

このコマンドを実行すると可憐とディレクトリに「.git」というディレクトリが作成される。

gitの管理対象ファイル

gitの管理対象ファイルを"索引(index)"に追加

$ git add test.txt

コミットする。

$ git commit

上記のコマンドを打つと、viが開いてコメント入力が求められるので、
「Initial commit」などと入力して保存して終了する。
ここで最初のバージョンがコミットされる。
test.txtを変更して、コミット。

$ echo hogehoge >> test.txt

差分を確認する。

$ git diff
diff --git a/test.txt b/test.txt
index 99ebcc0..62a24b7 100644
--- a/test.txt
+++ b/test.txt
@@ -1 +1,2 @@
 uhouho
+hogehoge

変更したファイルを索引に追加。

$ git add test.txt

索引に登録されたけど、コミットされていないファイルを確認。

$ git diff --cached
diff --git a/test.txt b/test.txt
index 99ebcc0..62a24b7 100644
--- a/test.txt
+++ b/test.txt
@@ -1 +1,2 @@
 uhouho
+hogehoge

コミットする。

$ git commit

test2.txtを新規作成、コミット

test2.txtを作成

$ echo ohyoohyo > test2.txt

test2.txtを索引に追加し、コミット

$ git add test2.txt
$ git commit

test2.txtを変更し、コミット

$ echo ahyaahay >> test2.txt

索引して、コミットするのは次の一行で書ける。

$ git commit -a

ブランチ管理

"experimental"というブランチを作成する。

$ git branch experimental

ブランチ一覧を表示

$ git branch
  experimental
* master

上記は、"experimental"と"master"というブランチがあることを示している。
「*」は、現在のブランチを指し示している。
作業ブランチを"experimental"に切り替える。

$ git checkout experimental
Switched to branch "experimental"
$ git branch
* experimental
  master

"experimental"上で、test.txtの内容を変更して、コミットする。

$ echo experimental > test.txt
$ git commit -a

作業ブランチを"master"に切り替えて、test2.txtの内容を変更する。

$ git checkout master
Switched to branch "master"
$ echo master > test2.txt
$ git commit -a

この時点で、下記のような状況となっている。

  • "experimental"上で、test.txtの内容が「experimental」となっている。
  • "master"上で、test2.txtの内容が「master」となっている。

これらの変更を"master"上でマージする。

$ git merge experimental
$ cat test.txt
experimental
$ cat test2.txt
master

マージした結果は、GUIツールで確認できる。

gitk

RewriteRuleについて

MOBA_HOME/conf/rewrite.conf

MobaSiFでは、以下のようなURLでアクセスすることをサポートしている。

http://www.uhouho.net/_bbs

上記のURLでアクセスした場合、MobaSiFの設定により下記のURLに書き換わる。

http://www.uhouho.net/?f=bbs

このURLの書き換えを実際に行なっているのが、apacheRewrite機能である。
Rewrite機能は、正規表現による柔軟なURLの書き換え機能を有している。
MobaSiFでは、下記のような設定がされている。

RewriteEngine On

RewriteCond %{REQUEST_METHOD} ^(TRACE|TRACK|OPTIONS)
RewriteRule .* - [F]

RewriteRule ^/\.([^/]+)/(.*)$ /$2?_u=$1 [QSA]
RewriteRule ^/_([^/\.]+)(\.html?)?$ /?f=$1 [QSA]

RewriteRule ^/static/(.*)$ /static/$1 [QSA,PT,L]
RewriteRule ^/$ /fcgi/index.fcgi [QSA,PT,L]
RewriteRule ^/(.*\.html)$ /fcgi/index.fcgi?f=page&page=$1 [QSA,PT,L]
RewriteRule ^/(.*\/)$ /fcgi/index.fcgi?f=page&page=$1 [QSA,PT,L]

これについてどういう動作をしているのかを調べてみた。

RewriteEngine On

この記述はRewrite機能をオンにするものである。

RewirteEngine Off

上記の記述にするとオフになる。

RewriteCond %{REQUEST_METHOD} ^(TRACE|TRACK|OPTIONS)

RewriteCondでは、「%{REQUEST_METHOD}」と正規表現「^(TRACE|TRACK|OPTIONS)」が一致するかどうかを見ている。
一致すれば次の行に書いてある「RewriteRule .* - [F]」に移る。

RewriteRule .* - [F]

RewriteRuleの記法は,

RewriteRule 一致パターン 置換パターン フラグ

となっている。この行では特に一致させて置換するようなことはしていない。
フラグで指定されている[F]はFORBIDDENを返すことを表している。
すなわち、前の行の記述と合わせると「TRACE、TRACK、OPTIONSのいずれかのメソッドでアクセスしてきたものに対しては、FORBIDDENを返す」という意味になる。
これらをperlで書くと以下のような感じになる。

if ($ENV{'REQUEST_METHOD'} =~ /^(TRACE|TRACK|OPTIONS)/) {
   $ENV{'PATH_INFO'} =~ s/.*/-/; # 意味ないぽ
   print "Status: 403 Forbidden\n\n";
   exit;
}
RewriteRule ^/\.([^/]+)/(.*)$ /$2?_u=$1 [QSA]

[QSA]は、「Query String Append」の略で、%{QUERY_STRING}を引数としてつけたいときに使うフラグである。
このフラグが付くと、置換結果は下記のものと等価となる。

/$2?_u=$1&%{QUERY_STRING}

RewriteRule ^/_([^/\.]+)(\.html?)?$ /?f=$1 [QSA]

この行は、MobaSiF

http://www.uhouho.net/_bbs

というURLの記述に対応している。アンダースコア以降のものを「f=bbs」に整形している。

RewriteRule ^/static/(.*)$ /static/$1 [QSA,PT,L]

[PT]フラグは、「Path Through」の略で、Rewriteを打ち切って、他の変換(Script Aliasなど)に渡すものである。
[L]フラグは、Rewrite機能の変換を打ち切ること指示するものである。
この記述では「http://www.uhouho.net/static/hato.html」というアクセスがあったとき、
「$MOBA_HOME/static/」にある静的ページ「hato.html」を直接読み込むことを表している。
結果的に変換前と変換後でURLに変化はないが、rewrite機能を打ちきるために記述されている。

RewriteRule ^/$ /fcgi/index.fcgi [QSA,PT,L]

なにも引数を連れていないときの変換である。
基本的に動的なページが呼び出されるときは、
「$MOBA_HOME/fcgi/index.fcgi」に飛ばされるようになっている。

RewriteRule ^/(.*\.html)$ /fcgi/index.fcgi?f=page&page=$1 [QSA,PT,L]

http://www.uhouho.net/hato.html

のような形式でアクセスがあったときの変換である。

RewriteRule ^/(.*\/)$ /fcgi/index.fcgi?f=page&page=$1 [QSA,PT,L]

http://www.uhouho.net/hato/

のような形式でアクセスがあったときの変換である。

まとめ

Apacheの機能であるrewrite機能を調べた。rewrite機能は、アクセスしてきたURLを正規表現で柔軟に変換する機能を有している。MobaSiFでは、「MOBA_HOME/conf/rewrite.conf」でその設定が行なわれている。この設定を有効にするには、httpd.confに「Include rewrite.conf」と記述する必要がある(実際にはMOBA_HOME/conf/httpd.confをapachehttpd.confでインクルードするように設定する)。これにより、cgiプログラムに届く前にURLが整形されるようになる。
今回、rewrite.confを読んだことにより、MobaSiFがどのようなアクセスに対応しているのかを把握することができた。今後は、rewrite.confを書き換えることにより、新しいアクセス方式を追加したり、アクセス制限をかけたり、といったことを行なっていく予定である。