blog.Ring.idv.tw

Articles

Hadoop - Chaining Maps

本文主要介紹在Hadoop 0.19.0就開始提供的Chaining Maps(Hadoop-3702)功能,在開始介紹之前先假想下述情況~ 是否曾在一個Mapper中處理許多步驟的事項?或者是否曾在Reducer結束之後還要針對每筆資料個別處理的情況?簡單來說~ 如果在你的MapReduce程式中有需要preprocessing或postprocessing的情況,那就適合用Chaining Maps來完成。

這裡我們假設將MapReduce的執行順序用「[MR]+」符號來表示,那麼透過Chaining Maps所執行的工作就可以用「M+RM*」來表示,代表可以執行一個以上的Mapper(preprocessing),接著在Reducer之後再由零或多個Mapper做後續的處理(postprocessing),這樣的作法可以帶來一些額外的好處,譬如:方便除錯、測試、易維護以及MapReduce程式的可重用性,下述是一個簡單的展示範例,主要藉由「ChainMapper」和「ChainReducer」兩個類別來完成,此範例執行的順序為:「WordSegmentationMap | StopWordMap | Reduce | FilterMap」。

Chaining Maps for Hadoop 0.21.x

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class ChainExample
{
	public static class WordSegmentationMap extends Mapper<LongWritable, Text, Text, IntWritable>
	{
		@Override
		public void map(LongWritable key, Text value , Context context) throws IOException, InterruptedException
		{
			String s[] = value.toString().split(" ");
			for(String w : s)
				context.write(new Text(w), new IntWritable(1));

		}
	}
	public static class StopWordMap extends Mapper<Text, IntWritable, Text, IntWritable>
	{
		private Set<String> stopwords = new HashSet<String>();
		public static final String[] ENGLISH_STOP_WORDS = {
		    "a", "and", "are", "as", "at", "be", "but", "by",
		    "for", "if", "in", "into", "is", "it",
		    "no", "not", "of", "on", "or", "s", "such",
		    "t", "that", "the", "their", "then", "there", "these",
		    "they", "this", "to", "was", "will", "with"
		  };
		
		protected void setup(Context context) throws IOException, InterruptedException
		{
			for(int i = 0 ; i < ENGLISH_STOP_WORDS.length ; i++)
				stopwords.add(ENGLISH_STOP_WORDS[i]);
		}
		@Override
		public void map(Text key, IntWritable value , Context context) throws IOException, InterruptedException
		{
			if(!stopwords.contains(key.toString()))
				context.write(key, value);
		}
	}
	
	public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>
	{
		@Override
		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
		{
			int sum = 0;
			for(IntWritable i : values)
				sum++;	

			context.write(key, new IntWritable(sum));
		}
	}
	public static class FilterMap extends Mapper<Text, IntWritable, IntWritable,Text>
	{
		@Override
		public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException
		{
			if(value.get()>100)
				context.write(value, key);
		}
	}

	public static void main(String[] args)throws Exception
	{
		String output = "/chainExample_out/";
		Configuration conf = new Configuration();
		Cluster cluster = new Cluster(conf);
		
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path(output), true);
		
		Job job = Job.getInstance(cluster);
		job.setJarByClass(ChainExample.class);
		job.setJobName("ChainExample");
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		TextInputFormat.setInputPaths(job, new Path("/chainExample/*"));
		TextOutputFormat.setOutputPath(job, new Path(output));
		
		ChainMapper.addMapper(job, WordSegmentationMap.class, LongWritable.class, Text.class, Text.class, IntWritable.class,  new Configuration(false));
		ChainMapper.addMapper(job, StopWordMap.class, Text.class, IntWritable.class, Text.class, IntWritable.class,  new Configuration(false));
		ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, new Configuration(false));
		ChainReducer.addMapper(job, FilterMap.class, Text.class, IntWritable.class, IntWritable.class, Text.class, new Configuration(false));
		job.waitForCompletion(true);
	}
}

2010-12-01 10:17:15 | Comments (13)

PageRank in MapReduce

圖片來源:www.jauhari.net

前言

有一陣子沒有寫MapReduce程式了,所以找個代表性的實例來練習一下...

PageRank in MapReduce

PageRank演算法最早是由Google兩位創辦人Sergey Brin & Larry Page在1998年的時候發表在World-Wide Web Conference的一篇論文:The Anatomy of a Large-Scale Hypertextual Web Search Engine所提出來的,該演算法主要用來計算網頁的重要性,以決定搜尋引擎該呈現搜尋結果時的一個排名依據,然而根據Google在1998年當時所索引的網頁數量來看,他們共索引了26 million pages(We knew the web was big...),所以可能三、四台機器就足以運算完成,但是到2000年時Google就索引了超過one billion pages,而這樣的規模就適合用MapReduce來分散式處理了,而本文主要介紹該如何用MapReduce的方式來完成這樣的演算法,然而重點在於PageRank是一種反覆式演算法(Iterative Algorithm),所以該如何應用在MapReduce並決定何時該跳離這個反覆式迴圈以結束運算就需要一些方式來處理。

P.S. 本範例純粹使用「純文字型態」來處理,如果你有效率的考量請試著改寫特定的OutputFormat和Writable實作。

Google PageRank 範例

這裡的範例假設全世界只有四個網頁,它們分別為:Adobe, Google, MSN and Yahoo,每個網頁的PageRank值(簡稱PR值)預設為10。

1. Adobe有三個對外連結,分別連到Google, MSN and Yahoo。

2. Google只有一個對外連結為Adobe。

3. MSN有一個對外連結為Google。

4. Yahoo則有兩個對外連結為MSN and Google。

Adobe	10.00 Google,MSN,Yahoo
Google	10.00 Adobe
MSN	10.00 Google
Yahoo	10.00 MSN,Google

所以從這個範例來看,由於有三個網頁都連結到Google,所以相對來說它的PR值應該是最高的,其次應則為Adobe,因為Google的分數最高且又只連結到Adobe,所以Adobe的PR值也會比較高。

PageRank - MapReduce for Hadoop 0.21.x

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class PageRank
{
	static enum PageCount{
		Count,TotalPR
	}
	public static class PageRankMapper extends Mapper<Object, Text, Text, Text>
	{
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException
		{
			context.getCounter(PageCount.Count).increment(1);
			String[] kv = value.toString().split("\t");
			String _key = kv[0];
			String _value = kv[1];
			String _PRnLink[] = _value.split(" ");
			String pr = _PRnLink[0];
			String link = _PRnLink[1];
			context.write(new Text(_key), new Text(link));

			String site[] = link.split(",");
			float score = Float.valueOf(pr)/(site.length)*1.0f;
			for(int i = 0 ; i < site.length ; i++)
			{
				context.write(new Text(site[i]), new Text(String.valueOf(score)));
			}	

		}
	}
 
	public static class PageRankReducer extends Reducer<Text, Text, Text, Text>
	{
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
		{
			StringBuilder sb = new StringBuilder();
			float factor = 0.85f;
			float pr = 0f;
			for(Text f : values)
			{				
				String value = f.toString();
				int s = value.indexOf(".");
				if(s != -1)
				{
					pr += Float.valueOf(value);
				}else{
					String site[] = value.split(",");
					int _len = site.length;
					for(int k = 0 ; k < _len ;k++)
					{
						sb.append(site[k]);
						sb.append(",");
					}
				}
			}

			pr = ((1-factor)+(factor*(pr)));
			context.getCounter(PageCount.TotalPR).increment((int)(pr*1000));
			String output = pr+" "+sb.toString();
			context.write(key, new Text(output));
		}
	}

	public static void main(String[] args) throws Exception
	{
		String input;
		String output;
		int threshold = 1000;
		int iteration = 0;
		int iterationLimit = 100;
		boolean status = false;
		
		while(iteration < iterationLimit)
		{
			if((iteration % 2) == 0)
			{
				input = "/pagerank_output/p*";
				output = "/pagerank_output2/";
			}else{
				input = "/pagerank_output2/p*";
				output = "/pagerank_output/";
			}

			Configuration conf = new Configuration();	
			
			FileSystem fs = FileSystem.get(conf);
			fs.delete(new Path(output), true);
			
			Job job = Job.getInstance(new Cluster(conf));
			job.setJobName("PageRank");
			job.setJarByClass(PageRank.class);
			job.setMapperClass(PageRankMapper.class);
			job.setReducerClass(PageRankReducer.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			TextInputFormat.addInputPath(job, new Path(input));
			TextOutputFormat.setOutputPath(job, new Path(output));
			status = job.waitForCompletion(true);
			iteration++;
			
			long count = job.getCounters().findCounter(PageCount.Count).getValue();
			long total_pr = job.getCounters().findCounter(PageCount.TotalPR).getValue();
			System.out.println("PageCount:"+count);
			System.out.println("TotalPR:"+total_pr);
			double per_pr = total_pr/(count*1.0d);
			System.out.println("Per PR:"+per_pr);
			if((int)per_pr == threshold)
			{
				System.out.println("Iteration:"+iteration);
				break;
			}
		}
		System.exit(status?0:1);
	}
}

關於上述程式所執行Map和Reduce所處理的過程及輸出結果就不詳加敘述了,留待有興趣的朋友們自行研究~

而關於如何決定跳離反覆式迴圈以結束運算的處理方式,筆者採用下述兩種方式:

1. 最多執行100次的反覆式運算,讓程式有一定的執行次數限制。

2. 分別累加頁面數量和每個網頁的PR值,並觀察其變化量呈現穩定狀態時就離開迴圈,上述範例求到小數第三位。

透過上述的處理方式,可以觀察到在執行第54次MapReduce運算時所呈現出來的結果:

Adobe	1.3334262 Google,MSN,Yahoo,
Google	1.39192 Adobe,
MSN	0.7523096 Google,
Yahoo	0.5279022 MSN,Google,

結果如預期的,Google的PR值最高,其次為Adobe,最後才是MSN和Yahoo。

P.S. 筆者沒有討厭Yahoo也沒有特別喜歡Google,純粹實驗性質... Orz (我比較愛Adobe)

相關資源

.Jimmy Lin and Michael Schatz. Design Patterns for Efficient Graph Algorithms in MapReduce. Proceedings of the 2010 Workshop on Mining and Learning with Graphs Workshop (MLG-2010), July 2010, Washington, D.C.

一式解讀 PageRank

2010-11-29 16:32:57 | Comments (9)

星海爭霸2 - 達成四顆鑽石等級 ^o^

其實本來沒有要買下去的,主要是有一天下班後去買便當時,看到電視的報導說一位女歌手「Linda」獲得星海爭霸2 - ESL 女子盃冠軍(新聞報導)... Orz 就因為這樣我就買下去了~~ 還記得她嗎?她就是之前「衣櫥演唱會」這首歌的女歌手~ 實在是太厲害了... Orz 看了一下他的戰績!!!! 四顆鑽石... = =" 她是不用工作嗎?女生可以玩到這樣實在是佩服呀~ 可以一邊工作又可以把遊戲玩的好(心想:我也來試試看XD)~ 話說Linda昨天在背電影劇本之前還玩了一下SC2... Orz 所以我也就開始沉溺下去了~ 而今天終於完成目標,達成SC2各對戰組合都鑽石等級了~ 雖然已經有很多高手達到這個層級,不過業餘的我還是要放一下煙火^O^ 接下來應該就不會玩的那麼勤奮了~ 該適時的調配時間花在其它地方了~

最近看到Microsoft Kinect發售了,感覺還蠻有趣的~ 不過自從國中之後就很少玩遊戲機了,所以如果我有這個感測裝置的話應該會想將它和電腦連在一塊,然後玩一些互動的東西吧~ 當然是用Flash,因為國外已經有人這麼玩了.. (Kinect now accessible from Flash)

2010-11-24 00:56:34 | Add Comment

快門的瞬間 - 展翅的蝴蝶

地點:知本國家森林遊樂區

機身:Sony A550

鏡頭:卡爾蔡斯16-80mm T* F3.5-4.5

快門:1/320s

焦距:80mm

ISO:200

這隻一直動來動去實在有夠難拍... Orz

2010-11-15 22:59:30 | Comments (3)

台中新光三越-馬蓋典

前天陪朋友去台中新光三越看顆鏡頭... 在閒晃的途中看到自稱是「馬蓋典」的陳漢典~ XD

手中剛好帶著相機~ 就拿起來拍個幾張~ ^^a

2010-10-11 22:54:22 | Comments (3)

Next Posts~:::~Previous Posts
Copyright (C) Ching-Shen Chen. All rights reserved.

::: 搜尋 :::

::: 分類 :::

::: 最新文章 :::

::: 最新回應 :::

::: 訂閱 :::

Atom feed
Atom Comment