基本上HBase的RPC設計是採用Hadoop的RPC並做一些更動而寫成的(HBase/RoadMaps),而如果要用一句話來解釋HBase/Hadoop的RPC設計可以這麼說:它是透過Dynamic Proxy Pattern + Reflection + NIO(Multiplexed, non-blocking I/O)所構成的,中間溝通的物件會透過序列化(serialization)的方式來傳遞,所以都需要實作Hadoop的Writable介面,下述是筆者利用HBase/Hadoop的RPC設計來自訂一個Hello, Java範例:
RPCInterface
自行定義一個「say()」方法供RPC呼叫。
package hbase.rpc;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
public interface RPCInterface extends HBaseRPCProtocolVersion
{
public String say();
}
Message
實作RPCInterface介面的Message類別,純粹回傳一個「Hello, Java」字串。
package hbase.rpc;
import java.io.IOException;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
public class Message implements RPCInterface
{
public String say()
{
return "Hello, Java";
}
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException
{
return HBaseRPCProtocolVersion.versionID;
}
}
TestRPCServer
該程式會透過「HBaseRPC.addToMap()」來註冊自行定義的方法(Method Registry),而內部就是利用Reflection來取得該類別所擁有的方法(Method),它會給予每個方法一個特定的ID,而該ID是一個整數值。
由於HBase/Hadoop的Server是採用Multiplexed, non-blocking I/O方式而設計的,所以它可以透過一個Thread來完成處理,但是由於處理Client端所呼叫的方法是Blocking I/O,所以它的設計會將Client所傳遞過來的物件先放置在Queue,並在啟動Server時就先產生一堆Handler(Thread),該Handler會透過Polling的方式來取得該物件並執行對應的方法,下述範例預設為10個Handler(HMaster/HRegionServer預設都為25個,根據"hbase.regionserver.handler.count"設定)。
package hbase.rpc;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;
public class TestRPCServer
{
private HBaseServer server;
private final HServerAddress address;
static{HBaseRPC.addToMap(RPCInterface.class, (byte)37);}
public TestRPCServer()
{
this.address = new HServerAddress("localhost:56789");
}
public void start()
{
try
{
Message msg = new Message();
this.server = HBaseRPC.getServer(msg, address.getBindAddress(), address.getPort(), 10, true, new HBaseConfiguration());
this.server.start();
while (true)
{
Thread.sleep(3000);
}
} catch (Exception e)
{
e.printStackTrace();
}
}
public static void main(String[] args)
{
new TestRPCServer().start();
}
}
TestRPCClient
這裡的「HBaseRPC.getProxy()」就是採用Dynamic Proxy Pattern + Reflection來設計,有興趣的朋友可以去研究它的Source Code。
package hbase.rpc;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
public class TestRPCClient
{
protected RPCInterface server;
static{HBaseRPC.addToMap(RPCInterface.class, (byte)37);}
@SuppressWarnings("unchecked")
public TestRPCClient()
{
try
{
server = (RPCInterface) HBaseRPC.getProxy(RPCInterface.class, HBaseRPCProtocolVersion.versionID, new InetSocketAddress("localhost", 56789), new HBaseConfiguration());
} catch (Exception e)
{
e.printStackTrace();
}
}
public String call() throws IOException
{
return server.say();
}
public static void main(String[] args) throws IOException
{
TestRPCClient client = new TestRPCClient();
System.out.println(client.call());
}
}
玩玩看吧!