blog.Ring.idv.tw

Articles

整合Cassandra和Hadoop - WordCount

由於Cassandra在0.6版開始提供和Hadoop整合,可以將Cassandra所儲存的資料當做Hadoop MapReduce的輸入來源,所以筆者前幾天在試著玩玩Cassandra該如何要和Hadoop整合時,用著「官方所提供的WordCount」範例,跑出來的結果居然完全錯誤!! trace了大半天才發現原來是Cassandra的Bug,還好這在最新釋出的0.6.4版已經被修正了(ColumnFamilyRecordReader returns duplicate rows),不過目前Cassandra還是沒有提供將Hadoop資料輸出到Cassandrad的介面實作(雖然可以在Reduce自行處理),這要等到0.7版才會釋出(A Hadoop Output Format That Targets Cassandra),下述就是Cassandra+Hadoop的WordCount程式:

測試資料

Key     Value
-----------------------------------------
Doc1    new home sales top forecasts 
Doc2    home sales rise in july 
Doc3    increase in home sales in july 
Doc4    july new home sales rise 

IRWordCountSetup

package cassandra;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

public class IRWordCountSetup
{
	public static final String UTF8 = "UTF8";

	public static void main(String[] args)throws Exception
	{
		TTransport tr = new TSocket("localhost", 9160);
		TProtocol proto = new TBinaryProtocol(tr);
		Cassandra.Client client = new Cassandra.Client(proto);
		tr.open();

		String keyspace = "Keyspace1";
		String columnFamily = "Standard1";

		ColumnPath colPathName = new ColumnPath(columnFamily);
		colPathName.setColumn("Doc".getBytes(UTF8));
		long timestamp = System.currentTimeMillis();
		
		client.insert(keyspace, "Doc1", colPathName, "new home sales top forecasts".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
		client.insert(keyspace, "Doc2", colPathName, "home sales rise in july".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
		client.insert(keyspace, "Doc3", colPathName, "increase in home sales in july".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
		client.insert(keyspace, "Doc4", colPathName, "july new home sales rise".getBytes(UTF8), timestamp, ConsistencyLevel.ONE);
	}
}

IRWordCount

package cassandra;

import java.io.IOException;
import java.util.Arrays;
import java.util.SortedMap;
import java.util.StringTokenizer;

import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class IRWordCount
{
	static final String KEYSPACE = "Keyspace1";
	static final String COLUMN_FAMILY = "Standard1";
	private static final String CONF_COLUMN_NAME = "Doc";
	private static final String OUTPUT_PATH_PREFIX = "/tmp/doc_word_count";

	public static class TokenizerMapper extends Mapper<String, SortedMap<byte[], IColumn>, Text, IntWritable>
	{
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		private String columnName;
		
		protected void setup(Context context) throws IOException, InterruptedException
		{
			this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME);
		}
		public void map(String key, SortedMap<byte[], IColumn> columns, Context context) throws IOException, InterruptedException
		{
			IColumn column = columns.get(columnName.getBytes());
			if(column == null)
				return;
			
			String value = new String(column.value());			
			System.out.println("read " + key + ":" + value + " from " + context.getInputSplit());

			StringTokenizer itr = new StringTokenizer(value);
			while (itr.hasMoreTokens())
			{
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
	{
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
		{
			int sum = 0;
			for (IntWritable val : values)
			{
				sum += val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}
	
	public static void main(String[] args) throws Exception
	{
		Path output = new Path(OUTPUT_PATH_PREFIX);
		Configuration conf = new Configuration();
        
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(output))
			fs.delete(output, true);
		
		String columnName = "Doc";
		conf.set(CONF_COLUMN_NAME, columnName);
		Job job = new Job(conf, "wordcount");
		job.setJarByClass(IRWordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setInputFormatClass(ColumnFamilyInputFormat.class);
		FileOutputFormat.setOutputPath(job, output);

		ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
		SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
		ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);

		boolean status = job.waitForCompletion(true);
		System.exit(status ? 0 : 1);
	}
}

輸出的結果為:

forecasts	1
home	4
in	3
increase	1
july	3
new	2
rise	2
sales	4
top	1

2010-08-09 15:27:32 | Add Comment

星海爭霸II - Starcraft II

上圖是2010年7月27日星海爭霸II在Yahoo所宣傳的Flash廣告

星海爭霸是一款我十年前很常玩的即時戰略遊戲~ 經過了十年~ 終於在今年的7月27日推出它的2代了「星海爭霸II」,由於目前仍然在封測中,所以可以免費的試玩~ 而我當然也在第一天就下載開始玩嚕~ (其實我已經約十年沒有玩電腦遊戲了..),由於有了一代的基礎,所以當安裝完成之後,馬上就選了蟲族(Zerg)來和電腦打一場,主要用來熟悉該種族的科技、兵種,打完這場之後~ 就立即切換到多人連線模式開始進行真人實戰PK,想當然在五場預選賽的成績也就沒有很好~ 結果3勝2負,一開始被分配到「白銀聯賽」對戰,經過了三天從「白銀」升級到「黃金」再到「白金聯賽」,哦哦~ 卡關了~ 因為在白金聯賽的實力其實有些已經都還不錯,畢竟再升上去就是最高等級的「鑽石聯賽」,加上有一兩天都沒辦法突破針對各種族的戰術設計~ 所以嚕~ 而今晚筆者終於奮鬥成功跳到「鑽石」的等級了,呃~ 不過也凌晨四點了... 好久沒有玩遊戲玩到那麼瘋了,接下來應該會有所收斂了~ 畢竟還是有「正當」的事情要做,不過至少要維持在「鑽石」等級不要降級就好 ^^ 關於等級的描述可以參考官網說明:戰階與天梯的問與答

從上圖鑽石聯賽前十二名的種族分佈可以發現,神族居然佔掉了1/2,而我愛用的蟲族只有1/4.. 哦哦~

2010-08-07 05:41:45 | Add Comment

東海夜市-毒家冰窖

址址:台中縣龍井鄉新東村台中港路東園巷2弄23號

價格:60元

這是位於東海夜市裡頭的一間「毒家冰窖」所賣的「超級芒果牛奶冰」~ 想吃芒果冰的話可以去那品嚐看看~

2010-07-25 18:31:05 | Comments (2)

有心機的摺耳貓

昨天去「東海-毒家冰窖」吃芒果冰~ 一進去店裡就會看到店長所飼養的「摺耳貓」~ 拍它時還會偷瞄~ 呵~ 形成一個有趣的畫面!

另外店長也收留了兩隻小野貓~ 而其中有一隻剛好直接將頭伸進小啤酒杯裡試著要喝東西~ 可惜沒拍到,不然就經典了~

2010-07-25 17:28:09 | Add Comment

Accelerometer in iPhone

(圖片來源:UIAcceleration Class Reference)

上圖是iPhone所俱備三維空間的加速度感應器,在App Store有許多的小遊戲都是用它來控制的,使用的方法很簡單,不過筆者不太喜歡用Interface Builder來學習iPhone上的開發技巧,所以此範例都是純手工。

主要程式

UIAccelerometer *acc = [UIAccelerometer sharedAccelerometer];
acc.updateInterval = 1.0f/20.0f;
acc.delegate = self;

由於UIAccelerometer class是一個Singleton pattern,所以需要透過「[UIAccelerometer sharedAccelerometer]」的方式來取得,在Java上就是一些「getInstance()」的方法。

這裡需要設置兩個property,它們分別為:updateInterval、delegate

updateInterval:代表加速度感應器更新的頻率。

delegate:用來註冊哪一個class有實作「UIAccelerometerDelegate」Protocol

UIAccelerometerDelegate

- (void)accelerometer:(UIAccelerometer*)accelerometer didAccelerate:(UIAcceleration*)acceleration 
{
	xx.text = [NSString stringWithFormat:@"%f",acceleration.x];
	yy.text = [NSString stringWithFormat:@"%f",acceleration.y];
	zz.text = [NSString stringWithFormat:@"%f",acceleration.z];
}

此範例純粹將X, Y, Z三個軸的值顯示出來。

範例下載

2010-06-18 14:57:02 | Add Comment

Next Posts~:::~Previous Posts
Copyright (C) Ching-Shen Chen. All rights reserved.

::: 搜尋 :::

::: 分類 :::

::: 最新文章 :::

::: 最新回應 :::

::: 訂閱 :::

Atom feed
Atom Comment