HadoopによるAnalog
http://d.hatena.ne.jp/naoya/20080511/1210506301
こちらのMapReduce::LiteのサンプルプログラムであるAnalogを、
javaで書いてHadoopで実行させてみた。
下記がjavaのソース。力業感たっぷり。
package Sample; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Analog extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); String tokens[] = new String[9]; for (int i = 0; i <= 8 && itr.hasMoreTokens(); i++) { tokens[i] = itr.nextToken(); } if (tokens[8] != null) { word.set(tokens[8]); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } static int printUsage() { System.out.println("Analog [-m <maps>] [-r <reduces>] <input> <output>"); ToolRunner.printGenericCommandUsage(System.out); return -1; } public int run (String[] args) throws Exception { JobConf conf = new JobConf(getConf(), Analog.class); conf.setJobName("Analog"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); List<String> other_args = new ArrayList<String>(); for (int i = 0; i < args.length; ++i) { try { if ("-m".equals(args[i])) { conf.setNumMapTasks(Integer.parseInt(args[++i])); } else if ("-r".equals(args[i])) { conf.setNumReduceTasks(Integer.parseInt(args[++i])); } else { other_args.add(args[i]); } } catch (NumberFormatException except) { System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); } catch (ArrayIndexOutOfBoundsException except) { System.out.println("ERROR: Required parameter missing from " + args[i-1]); return printUsage(); } } if (other_args.size() != 2) { System.out.println("ERROR: Wrong number of parameters: " + other_args.size() + " instead of 2."); return printUsage(); } FileInputFormat.setInputPaths(conf, other_args.get(0)); FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); JobClient.runJob(conf); return 0; } public static void main (String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Analog(), args); System.exit(res); } }
「Sample」ディレクトリを作って、上記のソースを「Analog.java」でそこに保存。
下記の要領で、コンパイルし、解析対象ファイルである「access_log」をHDFSに上げ、
プログラム実行して、結果確認を行なった。
$ javac -cp hadoop-0.17.2.1-core.jar Sample/*.java # コンパイル $ jar cvf Analog.jar Sample/*.class # jarファイル作成 $ bin/hadoop dfs -copyFromLocal access_log access_log # HDFSに転送 $ ./bin/hadoop jar Analog.jar Sample.Analog access_log result # Analogを実行 $ ./bin/hadoop dfs -cat result/part-00000 # 処理結果を確認 200 163 301 2 302 1 404 28 500 2
プログラムの詳細メモはまた明日。
参考サイト
MapReduceのJava実装 Apache Hadoopを使ってみた - コンパイル手順を参考にさせていただきました。
MapReduce::LiteによるWordCount
http://codezine.jp/article/detail/2485
こちらのサイトで紹介されているHadoopのサンプルプログラム
WordCount(WordCount.java)と同じ事を行なうものを、
MapReduce::Liteで書いてみた。
WordCountはスペース区切りで並べられた単語を数えるプログラムである。
入力データ:
hoge hoge hoge fuga fuga hoge uho
出力データ:
uho => 1
fuga => 2
hoge => 4
まずマップを記述する。
package WordCount::Mapper; use Moose; with 'MapReduce::Lite::Mapper'; sub map { my ($self, $key, $value) = @_; my @elements = split /\s+/, $value; for my $element (@elements) { $self->emit($element, 1); } }
MapReduce::Lite::Mapperを継承し、新たにWordCount::Mapperを作成する。
map関数をオーバライドして、スペース区切りのデータを切り分け、
それぞれの単語を「<単語, 1>」の形でemitするようにする。
この後、reducerに渡される前にシャッフルという段階があり、処理される。
この処理が行なわれた後、リストは各単語毎に「<単語, (1, 1, ...)>」という形になる。
次にこのリストを受け取るReducerを作成する。
package WordCount::Reducer; use Moose; with 'MapReduce::Lite::Reducer'; sub reduce { my ($self, $key, $value) = @_; $self->emit($key, $value->size); }
MapReduce::Lite::Reducerを継承し、新たにWordCount::Reducerを作成する。
reduce関数をオーバライドして、「<単語、(1, 1, ....)>」の形で渡ってくるリストを受け取り、
(1, 1, ...)の部分の長さを求めるようにする。
実質、(1, 1, ...)の長さは、単語の出現回数となる。
これらのMapReduceを実行するプログラムは下記の通り。
package main; use FindBin::lib; use MapReduce::Lite; my $spec = MapReduce::Lite::Spec->new(intermidate_dir => "./tmp"); for (@ARGV) { my $in = $spec->create_input; $in->file($_); $in->mapper('WordCount::Mapper'); } $spec->out->reducer('WordCount::Reducer'); $spec->out->num_tasks(1); mapreduce($spec);
中間ファイルを置くディレクトリ「tmp」を作成し、単語リストが書かれたファイル「./inputs/file1」を読み込ませて、プログラムを実行。
$ mkdir tmp
$ ./WordCount.pl ./inputs/file1
uho => 1
fuga => 2
hoge => 4
GFSについての箇条書きメモ
- Google File System
- Google独自の分散ファイルシステム
- 多数のマシンを組み合わせて巨大なストレージを作る
- 複数のマシンが動作する事で効率的なデータ転送
- 膨大なデータの通り道
- 扱われるファイルが巨大
- 新しいデータをどんどん書き加えるか、読み出し続けるか
- GFS(データの読み出し)→アプリケーション(データ加工)→GFS(データ書き込み)
- データ転送に特化される設計
- GFS上では、ファイルは常にバックアップされた状態
- 大きなファイル(数百MB)を一気に流し込む
- データを書き換えしない
- ファイルロック(排他制御)を行なわない
- ファイルをキューとして用いる
- ファイル操作のためのインタフェース
- Create (作成)
- Delete (削除)
- Open (オープン)
- Close (クローズ)
- Read (読み込み)
- Write (書き込み)
- Snapshot (スナップショット)
- ファイルの複製
- Record Append (レコード追加)
- ひとまとまりのデータを最後に追加
- ファイルは自動的に複製される
- マスタ(Master)、チャンクサーバ(Chunk Server)、クライアント(Client)の3つの要素で構成される
- Master・・・GFS全体の状態をコントロールする中央サーバ
- Chunk Server・・・ハードディスクへの入出力を担当
- Client・・・ファイルを読み書きするアプリケーション
- ファイルはチャンク(64MB)の単位に分割される
- チャンクは通常3つのチャンクサーバにコピーされる
- マスタは、どのチャンクサーバにどのチャンクがあるのかを把握している
- クライアントはまずマスタに問い合わせる
- マスタは、チャンクサーバの情報をクライアントに送る
- 以後、クライアントとチャンクサーバ間で読み書きが行なわれる
- 読み込みは最寄りのサーバから
- IPアドレスが距離に応じて割り当てられている
- 負荷によっては、マスタはチャンクのコピーを増やす
- 書き込みは複数サーバへ
- マスタは、まとめ役のチャンクサーバを決定する(プライマリの決定)
- クライアントには、プライマリなチャンクサーバを伝えられる
- クライアントは、要求するチャンクをチャンクサーバに送る(プライマリに限らない)
- バケツリレーの様にチャンクサーバ間でチャンクをコピー
- データ送信終了後、クライアントはプライマリにデータ書き込みを要求
- プライマリは、手元のチャンクを書き込み、セカンダリに書き込み要求
- 書き込み終了後、クライアントに通知
- 様々なエラーへの対応
- チャンクサーバの故障
- ハードディスクの問題
- プライマリが失敗したとき
- クライアントがはじめからやり直す
- マスタが新しいプライマリを決定する
- セカンダリが失敗のとき
- プライマリとセカンダリでチャンク内容が異なる状況
- チャンクにつけたシリアルナンバーをマスタに通知
- 古いデータを読み込んでしまう様な、読み書きを同時に行なうアプリケーションは、「レコード追加」で対処
- 同時書き込みで不整合が起こる
- レコード追加によるアトミックな書き込み
- レコード追加は、中断されることなく一度に行なわれる(アトミック)
- レコードは、ファイルの末尾に追加されていく
- レコードは、一回以上書き込まれる事を保証している
- 書き込みに失敗した場合、もう一度ファイルの末尾に新しい領域を確保するところからやり直し
- 読み出し側で失敗した箇所を読み飛ばす
- スナップショットはコピーオンライトで高速化
- ファイルの情報(チャンクの場所など)を持っているマスタの中で、ファイル情報をコピー
- チャンクの内容を書き換えるタイミングでチャンクがコピーされる(コピーライトオン)
- マスタの持つ情報
- ファイル名とそれを構成するチャンクリスト
- チャンクサーバがどこにあり、今どのような状態か
- どのチャンクサーバがどのチャンクを持っているか
- マスタはGFS全体を最適化する
- チャンクの障害対策
- チャンクを保存する時、チェックサムを取る
- マスタの障害対策として、オペレーションログを記録している
- データ管理の基盤として動く
Hadoopインストールメモ
Javaのインストール
http://java.sun.com/javase/downloads/index.jspで、
「Java SE Development Kit (JDK) 6 Update 10」をダウンロードリンクを辿る。
「j2sdk-1_4_2_18-linux-i586-rpm.bin」をダウンロード。
以下のようにコマンドを打ってインストール。
$ chmod +x j2sdk-1_4_2_18-linux-i586-rpm.bin
$ su
# rpm -ivh j2sdk-1_4_2_18-linux-i586.rpm
Hadoopのインストール
http://hadoop.apache.org/core/releases.html#Download
上記のURLからHadoopをダウンロード。
ファイル名は「hadoop-[VERSION].tar.gz」。
今回、ダウンロードしたのは「hadoop-0.17.2.1.tar.gz」。
その後、解凍して、ディレクトリ名を「Hadoop」変更。
$ tar -xvzf hadoop-0.17.2.1.tar.gz
$ mv hadoop-0.17.2.1 hadoop
Hadoopの設定
次に「[HADOOP INSTALL]/conf/hadoop-env.sh」を編集する。
JAVA_HOME環境変数設定する。上記の方法でインストールした場合、
「/usr/java/jdk1.6.0_10」
と設定。
次に「[HADOOP INSTALL]/conf/hadoop-site.xml」を編集する。(ファイルがなければ、hadoop-default.xmlをコピーしてくる)。
下記のように変更した。
hadoop.tmp.dir
/tmp/hadoop-${user.name}
fs.default.name
hdfs://localhost:54310
mapred.job.tracker
localhost:54311
dfs.replication
1
Hadoopのフォーマット、起動、停止
以下のコマンドでフォーマットと起動と停止を行なえる。
$ cd hadoop
$ ./bin/hadoop namenode -format # フォーマット
$ ./bin/start-all.sh # 起動
$ ./bin/stop-all.sh # 停止
参考サイト
- Hadoopのインストールとサンプルプログラムの実行:CodeZine(http://codezine.jp/article/detail/2485)
gitの使い方メモ2
プログラム群をgitの管理下に置く
Railsでプログラムを作成。
$ rails myproject create create app/controllers create app/helpers create app/models create app/views/layouts create config/environments create config/initializers create db create doc create lib create lib/tasks create log create public/images create public/javascripts create public/stylesheets create script/performance create script/process create test/fixtures create test/functional create test/integration create test/unit create vendor create vendor/plugins create tmp/sessions create tmp/sockets create tmp/cache create tmp/pids create Rakefile create README create app/controllers/application.rb create app/helpers/application_helper.rb create test/test_helper.rb create config/database.yml create config/routes.rb create config/initializers/inflections.rb create config/initializers/mime_types.rb create config/initializers/new_rails_defaults.rb create config/boot.rb create config/environment.rb create config/environments/production.rb create config/environments/development.rb create config/environments/test.rb create script/about create script/console create script/dbconsole create script/destroy create script/generate create script/performance/benchmarker create script/performance/profiler create script/performance/request create script/process/reaper create script/process/spawner create script/process/inspector create script/runner create script/server create script/plugin create public/dispatch.rb create public/dispatch.cgi create public/dispatch.fcgi create public/404.html create public/422.html create public/500.html create public/index.html create public/favicon.ico create public/robots.txt create public/images/rails.png create public/javascripts/prototype.js create public/javascripts/effects.js create public/javascripts/dragdrop.js create public/javascripts/controls.js create public/javascripts/application.js create doc/README_FOR_APP create log/server.log create log/production.log create log/development.log create log/test.log $ cd myproject/ README config lib script vendor Rakefile db log test app doc public tmp
作成したファイルをgitで管理するように設定。
myprojectディレクト上で次のコマンドを打ち込む。
$ git init $ git add . $ git commit
次のコマンドでコミットの履歴が見られる。
$ git log commit 1e2581835d4a1e09f51fece88de4d14f42c6a73e Author: hato_mune <hatomune@gmail.com> Date: Tue Oct 7 23:21:46 2008 +0900 Initial commit
gitの使い方メモ
gitの管理対象のファイルを作る。
myprojectディレクトリを作る。
$ mkdir myproject $ cd myproject
「uhouho」という文字列が記述されたファイル「test.txt」を作る。
$ echo uhouho > test.txt
gitの初期設定
氏名とメールアドレスを設定する。
$ git config --global user.name "hato_mune" $ git config --global user.email "hato_mune@gmail.com
リポジトリを作成する。
$ cd myproject $ git init
gitの管理対象ファイル
gitの管理対象ファイルを"索引(index)"に追加
$ git add test.txt
コミットする。
$ git commit
上記のコマンドを打つと、viが開いてコメント入力が求められるので、
「Initial commit」などと入力して保存して終了する。
ここで最初のバージョンがコミットされる。
test.txtを変更して、コミット。
$ echo hogehoge >> test.txt
差分を確認する。
$ git diff diff --git a/test.txt b/test.txt index 99ebcc0..62a24b7 100644 --- a/test.txt +++ b/test.txt @@ -1 +1,2 @@ uhouho +hogehoge
変更したファイルを索引に追加。
$ git add test.txt
索引に登録されたけど、コミットされていないファイルを確認。
$ git diff --cached diff --git a/test.txt b/test.txt index 99ebcc0..62a24b7 100644 --- a/test.txt +++ b/test.txt @@ -1 +1,2 @@ uhouho +hogehoge
コミットする。
$ git commit
test2.txtを新規作成、コミット
test2.txtを作成
$ echo ohyoohyo > test2.txt
test2.txtを索引に追加し、コミット
$ git add test2.txt $ git commit
test2.txtを変更し、コミット
$ echo ahyaahay >> test2.txt
索引して、コミットするのは次の一行で書ける。
$ git commit -a
ブランチ管理
"experimental"というブランチを作成する。
$ git branch experimental
ブランチ一覧を表示
$ git branch experimental * master
上記は、"experimental"と"master"というブランチがあることを示している。
「*」は、現在のブランチを指し示している。
作業ブランチを"experimental"に切り替える。
$ git checkout experimental Switched to branch "experimental" $ git branch * experimental master
"experimental"上で、test.txtの内容を変更して、コミットする。
$ echo experimental > test.txt $ git commit -a
作業ブランチを"master"に切り替えて、test2.txtの内容を変更する。
$ git checkout master Switched to branch "master" $ echo master > test2.txt $ git commit -a
この時点で、下記のような状況となっている。
- "experimental"上で、test.txtの内容が「experimental」となっている。
- "master"上で、test2.txtの内容が「master」となっている。
これらの変更を"master"上でマージする。
$ git merge experimental $ cat test.txt experimental $ cat test2.txt master
マージした結果は、GUIツールで確認できる。
gitk
RewriteRuleについて
MOBA_HOME/conf/rewrite.conf
MobaSiFでは、以下のようなURLでアクセスすることをサポートしている。
上記のURLでアクセスした場合、MobaSiFの設定により下記のURLに書き換わる。
このURLの書き換えを実際に行なっているのが、apacheのRewrite機能である。
Rewrite機能は、正規表現による柔軟なURLの書き換え機能を有している。
MobaSiFでは、下記のような設定がされている。
RewriteEngine On
RewriteCond %{REQUEST_METHOD} ^(TRACE|TRACK|OPTIONS)
RewriteRule .* - [F]RewriteRule ^/\.([^/]+)/(.*)$ /$2?_u=$1 [QSA]
RewriteRule ^/_([^/\.]+)(\.html?)?$ /?f=$1 [QSA]RewriteRule ^/static/(.*)$ /static/$1 [QSA,PT,L]
RewriteRule ^/$ /fcgi/index.fcgi [QSA,PT,L]
RewriteRule ^/(.*\.html)$ /fcgi/index.fcgi?f=page&page=$1 [QSA,PT,L]
RewriteRule ^/(.*\/)$ /fcgi/index.fcgi?f=page&page=$1 [QSA,PT,L]
これについてどういう動作をしているのかを調べてみた。
RewriteCond %{REQUEST_METHOD} ^(TRACE|TRACK|OPTIONS)
RewriteCondでは、「%{REQUEST_METHOD}」と正規表現「^(TRACE|TRACK|OPTIONS)」が一致するかどうかを見ている。
一致すれば次の行に書いてある「RewriteRule .* - [F]」に移る。
RewriteRule .* - [F]
RewriteRuleの記法は,
RewriteRule 一致パターン 置換パターン フラグ
となっている。この行では特に一致させて置換するようなことはしていない。
フラグで指定されている[F]はFORBIDDENを返すことを表している。
すなわち、前の行の記述と合わせると「TRACE、TRACK、OPTIONSのいずれかのメソッドでアクセスしてきたものに対しては、FORBIDDENを返す」という意味になる。
これらをperlで書くと以下のような感じになる。
if ($ENV{'REQUEST_METHOD'} =~ /^(TRACE|TRACK|OPTIONS)/) { $ENV{'PATH_INFO'} =~ s/.*/-/; # 意味ないぽ print "Status: 403 Forbidden\n\n"; exit; }
RewriteRule ^/\.([^/]+)/(.*)$ /$2?_u=$1 [QSA]
[QSA]は、「Query String Append」の略で、%{QUERY_STRING}を引数としてつけたいときに使うフラグである。
このフラグが付くと、置換結果は下記のものと等価となる。
/$2?_u=$1&%{QUERY_STRING}
RewriteRule ^/_([^/\.]+)(\.html?)?$ /?f=$1 [QSA]
この行は、MobaSiFの
というURLの記述に対応している。アンダースコア以降のものを「f=bbs」に整形している。
RewriteRule ^/static/(.*)$ /static/$1 [QSA,PT,L]
[PT]フラグは、「Path Through」の略で、Rewriteを打ち切って、他の変換(Script Aliasなど)に渡すものである。
[L]フラグは、Rewrite機能の変換を打ち切ること指示するものである。
この記述では「http://www.uhouho.net/static/hato.html」というアクセスがあったとき、
「$MOBA_HOME/static/」にある静的ページ「hato.html」を直接読み込むことを表している。
結果的に変換前と変換後でURLに変化はないが、rewrite機能を打ちきるために記述されている。
RewriteRule ^/$ /fcgi/index.fcgi [QSA,PT,L]
なにも引数を連れていないときの変換である。
基本的に動的なページが呼び出されるときは、
「$MOBA_HOME/fcgi/index.fcgi」に飛ばされるようになっている。
まとめ
Apacheの機能であるrewrite機能を調べた。rewrite機能は、アクセスしてきたURLを正規表現で柔軟に変換する機能を有している。MobaSiFでは、「MOBA_HOME/conf/rewrite.conf」でその設定が行なわれている。この設定を有効にするには、httpd.confに「Include rewrite.conf」と記述する必要がある(実際にはMOBA_HOME/conf/httpd.confをapacheのhttpd.confでインクルードするように設定する)。これにより、cgiプログラムに届く前にURLが整形されるようになる。
今回、rewrite.confを読んだことにより、MobaSiFがどのようなアクセスに対応しているのかを把握することができた。今後は、rewrite.confを書き換えることにより、新しいアクセス方式を追加したり、アクセス制限をかけたり、といったことを行なっていく予定である。