blog.Ring.idv.tw

Articles

分散式處理Sobel Edge Detector

.2010/05/24 已新增MapReduce New API版本

大約兩年前我曾用ActionScript寫了「Sobel - 邊緣偵測 for AS2」,那時純粹只是抱持著好玩的心態~ 而現在用同樣的例子改成Hadoop版本來試試~ 當然最主要就是要藉重它分散式運算的能力~ 只是這樣的應用僅需要透過「Map」階段將處理後的影像直接寫入HDFS就行了~ 不需要再經過shuffle和reduce階段來浪費頻寬等資源~ 另外值得一提的是~ 這個例子要處理的是整張影像檔~ 所以要避免在進行「Map」階段之前處於被分割的命運~ 這裡採用的作法是覆寫「isSplitable()」method並將整份檔案當作一筆Record來處理,有興趣的朋友請見附檔:

import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import javax.imageio.ImageIO;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.sun.image.codec.jpeg.JPEGCodec;
import com.sun.image.codec.jpeg.JPEGImageEncoder;

public class SobelProcessing extends Configured implements Tool
{

    public static class Map extends MapReduceBase implements
            Mapper<NullWritable, BytesWritable, Text, Text>
    {

        private JobConf conf;

        @Override
        public void configure(JobConf conf)
        {
            this.conf = conf;
        }

        public void map(NullWritable key, BytesWritable value,
                OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException
        {
            String filename = conf.get("map.input.file");
            String output_dir = conf.get("output.dir");
            filename = getFileName(filename);
            FileSystem fs = FileSystem.get(conf);
            FSDataOutputStream dos = fs.create(new Path(output_dir + filename + ".jpg"));

            BufferedImage src = ImageIO.read(new ByteArrayInputStream(value.getBytes()));

            float sobscale = Float.valueOf(conf.get("sobscale"));
            int offsetval = Integer.valueOf(conf.get("offsetval"));

            int iw = src.getWidth();
            int ih = src.getHeight();
            BufferedImage dest = new BufferedImage(iw, ih, src.getType());

            int[][] gray = new int[iw][ih];

            for (int x = 0; x < iw; x++)
            {
                for (int y = 0; y < ih; y++)
                {
                    int rgb = src.getRGB(x, y);
                    int r = 0xFF & (rgb >> 16);
                    int g = 0xFF & (rgb >> 8);
                    int b = 0xFF & rgb;
                    gray[x][y] = (int) (0.299 * r + 0.587 * g + 0.114 * b);
                }
            }

            for (int x = 1; x < iw - 1; x++)
            {
                for (int y = 1; y < ih - 1; y++)
                {
                    int a = gray[x - 1][y - 1];
                    int b = gray[x][y - 1];
                    int c = gray[x + 1][y - 1];
                    int d = gray[x - 1][y];
                    int e = gray[x + 1][y];
                    int f = gray[x - 1][y + 1];
                    int g = gray[x][y + 1];
                    int h = gray[x + 1][y + 1];

                    int hor = (a + d + f) - (c + e + h);

                    if (hor < 0)
                        hor = -hor;

                    int vert = (a + b + c) - (f + g + h);

                    if (vert < 0)
                        vert = -vert;

                    int gc = (int) (sobscale * (hor + vert));
                    gc = (gc + offsetval);

                    if (gc > 255)
                        gc = 255;

                    int sobel = 0xff000000 | gc << 16 | gc << 8 | gc;
                    dest.setRGB(x, y, sobel);
                }
            }

            JPEGImageEncoder encoder = JPEGCodec.createJPEGEncoder(dos);
            encoder.encode(dest);
            dos.close();
        }

        public String getFileName(String s)
        {
            return s.substring(s.lastIndexOf("/"), s.lastIndexOf("."));
        }
    }

    public int run(String[] args) throws Exception
    {
        JobConf conf = new JobConf(getConf(), SobelProcessing.class);

        conf.set("sobscale", "1.0");
        conf.set("offsetval", "0");
        conf.set("output.dir", args[1]);

        conf.setJobName("SobelProcessing");
        conf.setMapperClass(Map.class);

        conf.setInputFormat(WholeFileInputFormat.class);
        conf.setOutputFormat(NullOutputFormat.class);

        conf.set("mapred.child.java.opts", "-Xmx256m");
        conf.setNumReduceTasks(0);

        WholeFileInputFormat.setInputPaths(conf, new Path(args[0]));
        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args)
    {
        try
        {
            int res = ToolRunner.run(new Configuration(), new SobelProcessing(), args);
            System.exit(res);
        } catch (Exception e)
        {
            e.printStackTrace();
        }

    }
}

結果:

原始碼

原始碼(New API)

2009-03-13 23:22:21 | Comments (58)

多了個新的身份...

上面的三連拍就是我那親愛的姪女~ 2月20日剛降臨在這個世界上~ 她未來應該是一位活潑+外向的女生 ^^ 而我也就因此多了個「舅舅」的身份~

剛見到時覺得怎麼那麼小一隻~ 哈哈~ 只有可愛兩個字可以形容吧~

回到新竹就要面臨許多事項的處理~ 這陣子努力不夠...嗯~ 繼續加油了! 必須讓自己保持在從正面的角度來處事。

2009-02-23 00:52:33 | Comments (4)

Average Length of URL?

從過年前到目前為止~ 都一直和學弟忙於將以前所實作的東西要轉換到線上版,我還需要點時間呀~ ><"

而在轉換的過程之中~ 突然想到一個問題!! 那就是全世界URL的平均長度究竟約多長?

我想這個答案只有大型搜尋引擎(GoogleYahooCuil)能給出一個較接近的答案吧~

下述是一個簡單計算這樣結果的MapReduce小程式:

URLList

http://l.yimg.com/f/a/tw/ivychang/708971_020409_420x80_0202_yahoo-elite.swf
http://l.yimg.com/tw.yimg.com/a/tw/ivychang/712756_1231_1231new350_100.swf
http://l.yimg.com/tw.yimg.com/a/tw/erinlin/721493_0123_350x200.swf
http://www.kriesi.at/wp-content/themes/dark_rainbow/js/Particles.swf
http://tw.promo.yahoo.com/2008auction/shpticket/images/top.swf
http://l.yimg.com/tw.yimg.com/a/tw/fanny/658216_101508_420x80_4.swf
http://l.yimg.com/f/a/tw/vikii/606895_shopping_center_20090203r.swf
http://l.yimg.com/f/a/tw/hedy/697827_e3_hp_012109.swf
http://l.yimg.com/tw.yimg.com/a/tw/ivychang/708334_0120_350x200_certificate_081224.swf
http://l.yimg.com/tw.yimg.com/a/tw/ivychang/708334_0120_350x100_linux_080826.swf
http://www.ysed.org.tw/3rd_upLoad/4156/index.swf

URLAvgLength

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class URLAvgLength extends Configured implements Tool {

	static enum Counter {
		URL_COUNT
	}

	public static class Map extends MapReduceBase implements
			Mapper<LongWritable, Text, Text, IntWritable> {

		private final static Text word = new Text("Len");

		public void map(LongWritable key, Text value,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {

			String key2 = value.toString();
			reporter.incrCounter(Counter.URL_COUNT, 1);
			output.collect(word, new IntWritable(key2.length()));
		}
	}

	public static class Reduce extends MapReduceBase implements
			Reducer<Text, IntWritable, Text, IntWritable> {

		public void reduce(Text key, Iterator<IntWritable> values,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {

			int sum = 0;
			while (values.hasNext()) {
				sum += values.next().get();
			}

			output.collect(key, new IntWritable(sum));
		}
	}

	public int run(String[] args) throws Exception {
		String input = "/usr/Ring/urllist/*";
		String output = "/usr/Ring/urlavglen";
		JobConf conf = new JobConf(getConf(), URLAvgLength.class);
		FileSystem fs = FileSystem.get(conf);
		fs.delete(new Path(output), true);

		conf.setJobName("URLAvgLength");
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);

		conf.setMapperClass(Map.class);
		conf.setCombinerClass(Reduce.class);
		conf.setReducerClass(Reduce.class);

		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);

		conf.setNumReduceTasks(1);

		TextInputFormat.setInputPaths(conf, new Path(input));
		TextOutputFormat.setOutputPath(conf, new Path(output));

		RunningJob running = JobClient.runJob(conf);
		Counters ct = running.getCounters();
		long count = ct.getCounter(Counter.URL_COUNT);

		InputStream in = fs.open(new Path("hdfs://localhost:9000"+output+"/part-00000"));
		BufferedReader br = new BufferedReader(new InputStreamReader(in));
		String line = br.readLine();
		Integer value = Integer.parseInt(line.split("\t")[1]);
		System.out.println("Avg:" + value/count);
		return 0;
	}

	public static void main(String[] args) {
		try {
			int res = ToolRunner.run(new Configuration(), new URLAvgLength(),args);
			System.exit(res);
		} catch (Exception e) {
			e.printStackTrace();
		}

	}
}
Avg:67

2009-02-07 02:29:48 | Comments (1)

Memcached - a distributed memory object caching system

Memcached(pronunciation: mem-cache-dee.)是一套分散式的記憶體快取系統~ 這年頭實在是什麼都要分散式的~XD

尤其從Google的身上更是如此~Google File SystemMapReduceBigtable... 全部都是分散式的..= ="(因為後兩者均架構在GFS之上)

根據memcached(wiki)上的解說~ 它原本是由「Danga Interactive」公司所為了開發「LiveJournal」而誕生的~ 它可以用在大部份database-driven的網站下~ (本部落格應該也要改用Memcached吧~ XD 有時間再說~),也就是說~ 我們可以快取一些經常要從資料庫抓出來的資料~ 然而它並沒有提供任何安全性或是認證的功能~ 換句話說~ Memcached需要被安置在防火牆的保護之下~ 而且許多大咖級的公司都有在用~ 像是YouTubeDiggWikipediaSlashdot... Facebook更是用了超過800臺伺服器,並提供「28TB」級的記憶體來作為快取使用~ (心想Google應該更多..),重點~ 在UMD的一位Jimmy Lin助理教授~ 很快的就將它整合到Hadoop之中~ 還寫了份技術報告「Low-Latency, High-Throughput Access to Static Global Resources within the Hadoop Framework.」.. 真是先驅..Orz

關於安裝Memcached網路上已經有許多資源了~ 有興趣的人Google一下就行了~ 或參考下述相關資源。

筆者主要用「spymemcached」這一個Java API,如果您需要其它語言的API請參考:Memcached Clients

下述是印出相關Memcached Server的狀態程式:

import java.net.SocketAddress;
import java.util.Map;

import net.spy.memcached.AddrUtil;
import net.spy.memcached.MemcachedClient;

public class MemcachedTest
{
	public static void main(String arg[]) throws Exception
	{
		long total_items = 0;
		MemcachedClient mc = new MemcachedClient(AddrUtil.getAddresses("xxx.xxx.xxx.xxx:11211"));
		
		Map<SocketAddress, Map<String, String>> stats = mc.getStats();

		for (Map.Entry<SocketAddress, Map<String, String>> e : stats.entrySet())
		{
			System.out.println("memcached server: " + e.getKey().toString());

			for (Map.Entry<String, String> s : e.getValue().entrySet())
			{
				System.out.println(" - " + s.getKey() + ": " + s.getValue());

				if (s.getKey().equals("curr_items"))
					total_items += Long.parseLong(s.getValue());

			}
		}

		System.out.println("Total number items in memcache: " + total_items);
		mc.shutdown();
	}
}

相關消息與資源

Scaling memcached at Facebook

How to install memcache on Debian Etch

2009-01-13 22:31:18 | Comments (2)

Java class File Format

昨天試著在整理電腦的東西~ 發現一個塵封已久的Word檔~ 裡頭包含的是以前研究Java class File Format記錄~

還記得當初剛看到「0xCAFEBABE」真是會心一笑啊~ 咖啡寶寶?XDDD 是的,沒錯! Java就是拿這四個Bytes當做File Signature,真是有創意極了!!

依稀記得這個class format曾經大幅更動過一次~ 那時候是從「Java 1.4.x」直接跳到「Java 5.0」,而當時最流行的就是「二隻老虎」~ 一隻是Java 5.0的代號「Tiger」,另一隻則是「Mac OS X 10.4」代號也稱為「Tiger」~ 直到去年才被「自然界」所取代... 因為開始出現很多的「Cloud」和「Air」.. 不是「雲」就是「大氣」~ 不勝枚舉 XDD(Cloud Computing、Tag Cloud、Adobe AIR、MacBook AIR...)

上述格式的原始檔如下:

Hello.java

public class Hello
{
	public static void main(String arg[])
	{
		String s = "Hello";
	}
}

有興趣的人可以對照著「VM Spec The class File Format」來剖析~

而Java 5.0 更動的class format請至「JSR 202: JavaTM Class File Specification Update」下載。

筆者的記錄檔:Hello.class 格式剖析 (整張圖放上來會漏漏長...)

2009-01-06 12:54:34 | Add Comment

Next Posts~:::~Previous Posts
Copyright (C) Ching-Shen Chen. All rights reserved.

::: 搜尋 :::

::: 分類 :::

::: 最新文章 :::

::: 最新回應 :::

::: 訂閱 :::

Atom feed
Atom Comment