一.Server类图
二.详细描述
- Call:server端的Call对象,对应着client的一个Call对象,两者id相同。同client Call一样,server Call封装了每次方法调用的参数信息和调用结果。
//server端的Call对象,对应着client的一个Call对象,两者id相同 private static class Call { private int id; // cleint Call的id private Writable param; // client Call传过来的参数,实际上就是client Call的param,实际上也是RPC.Invocation private Connection connection; //server到client的连接 private ByteBuffer response; //server Call的结果,类似client Call的value public Call(int id, Writable param, Connection connection) { this.id = id; this.param = param; this.connection = connection; this.response = null; } public void setResponse(ByteBuffer response) { this.response = response; } }
- Connection:client向server发送消息时,server端nio接受后会创建一个到client的连接,用来向client发送消息。
public class Connection { private boolean rpcHeaderRead = false; // if initial rpc header is read private boolean headerRead = false; //if the connection header that private SocketChannel channel; private ByteBuffer data; private LinkedList<Call> responseQueue;//需要发往client端的Call private Socket socket; //初始化server Connction //channel client的SocketChannel public Connection(SelectionKey key, SocketChannel channel, this.channel = channel; this.socket = channel.socket(); this.responseQueue = new LinkedList<Call>(); }
- Listener:监听client发送过来的消息,分发给Reader线程处理,最终目的是把消息封装成Server call对象,放入callQueue。
//监听线程,NIO Reactor模式,读取client发送过来的消息 private class Listener extends Thread { private ServerSocketChannel acceptChannel = null; //the accept channel private Selector selector = null; //the selector that we use for the server private Reader[] readers = null; private InetSocketAddress address; //the address we bind at private ExecutorService readPool; public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // bindserver socket到本机ip+指定port 50020 bind(acceptChannel.socket(), address, backlogLength); // create a selector; selector= Selector.open(); //线程池执行所有消息 readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads); for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); Reader reader = new Reader(readSelector); readers[i] = reader; readPool.execute(reader); } acceptChannel.register(selector, SelectionKey.OP_ACCEPT); } //3.1.创建Reader线程读取消息,最终目的把消息封装成Server call对象,放入callQueue private class Reader implements Runnable { private Selector readSelector = null; Reader(Selector readSelector) { this.readSelector = readSelector; } public void run() { nio方式处理消息{ doRead(key); } } }
- Handler:从callQueue里面取Call,调用call方法(实际调用RPC.Server的call方法,真正执行远程命令的地方),远程命令执行完后,决定是否立刻向client发送执行命令的结果(Server call的response)。
//4.从callQueue里面取Call,调用call方法(实际调用RPC.Server的call方法) private class Handler extends Thread { @Override public void run() { while (running) { //4.1 从callQueue里面取Call,调用call方法 final Call call = callQueue.take(); // pop the queue; maybe blocked here CurCall.set(call); value = call(call.connection.protocol, call.param, call.timestamp); CurCall.set(null); //4.2 为Server call赋值 setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error); //4.3处理赋值完后的call responder.doRespond(call); } } }
- Responder:异步向client发送Server call的结果。
//5.异步发送Server call的response到client端 private class Responder extends Thread { private Selector writeSelector; Responder() throws IOException { writeSelector = Selector.open(); // create a selector } @Override public void run() { while (running) { Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); //5.异步写 if (key.isValid() && key.isWritable()) { doAsyncWrite(key); } } } } //4.3将赋值完的server call放入responder队列里 void doRespond(Call call) throws IOException { synchronized (call.connection.responseQueue) { call.connection.responseQueue.addLast(call); //如果真有一个call,同步发送 if (call.connection.responseQueue.size() == 1) { processResponse(call.connection.responseQueue, true); } //如果多个call,交给Responder线程异步发送 } } //5异步发送 private void doAsyncWrite(SelectionKey key) throws IOException { Call call = (Call)key.attachment(); processResponse(call.connection.responseQueue, false); } //向client写消息 private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException { Call call = responseQueue.removeFirst(); SocketChannel channel = call.connection.channel; channelWrite(channel, call.response); } }
- Server:IPC Server。
public abstract class Server { private BlockingQueue<Call> callQueue; // queued calls private Listener listener = null; private Responder responder = null; private int numConnections = 0; private Handler[] handlers = null; //1.初始化一个IPC server,指定RPC服务器地址和端口 protected Server(String bindAddress, int port, Invocation.class, ...) throws IOException { this.bindAddress = bindAddress; this.port = port; this.paramClass = paramClass; //创建listener线程 listener = new Listener(); this.port = listener.getAddress().getPort(); //创建responder线程 responder = new Responder(); } //2.启动IPC server public synchronized void start() { //启动istener线程 responder.start(); //启动responder线程 listener.start(); //创建Handler线程,启动Handler线程 handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } } //3.client第一次发送消息到server时触发doAccept public void run() { nio方式接受消息{ doAccept(key); } } //3.doAccept void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); //3.1.创建Reader线程读取消息 Reader reader = getReader(); reader.startAdd(); SelectionKey readKey = reader.registerChannel(channel); //3.2得根据client的SocketChannel创建一个server Connection,传递给Handler线程 c = new Connection(readKey, channel, System.currentTimeMillis()); readKey.attach(c); } } //3.1Reader线程读取消息 void doRead(SelectionKey key) throws InterruptedException { Connection c = (Connection)key.attachment(); c.readAndProcess(); } } //3.1读取channel public int readAndProcess() throws IOException, InterruptedException { int count = channelRead(channel, rpcHeaderBuffer); count = channelRead(channel, data); processOneRpc(data.array()); return count; } private void processOneRpc(byte[] buf) throws IOException, InterruptedException { processData(buf); } //3.1根据client channel内容(方法参数)创建一个server Call,放入callQueue private void processData(byte[] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // try to read an id Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param param.readFields(dis); Call call = new Call(id, param, this); callQueue.put(call); // queue the call; maybe blocked here } } //4.1 public abstract Writable call(Class<?> protocol, Writable param, long receiveTime , throws IOException; //4.2 为server call赋值(response) private void setupResponse(ByteArrayOutputStream response, Call call, Status status, Writable rv, String errorClass, String error) throws IOException { call.setResponse(ByteBuffer.wrap(response.toByteArray())); } }
三.关于Socket和ServerSocket的bind方法
- Java网络编程从入门到精通(25):创建ServerSocket对象 http://edu.xvna.com/html/68156_3.html
- 简单分析一下socket中的bind http://www.cnblogs.com/nightwatcher/archive/2011/07/03/2096717.html
相关推荐
第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...
第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 01-hadoop职位需求状况.avi 02-hadoop课程安排.avi 03-hadoop应用场景.avi 04-hadoop对海量数据处理的解决思路.avi 05-hadoop版本选择和...
Hadoop 培训课程(2)HDFS 分布式文件系统与HDFS HDFS体系结构与基本概念*** HDFS的shell操作*** java接口及常用api*** ---------------------------加深拓展---------------------- RPC调用** HDFS的分布式存储架构的...
《Hadoop 2.X HDFS源码剖析》以Hadoop 2.6.0源码为基础,深入剖析了HDFS 2.X中各个模块的实现细节,包括RPC框架实现、Namenode实现、Datanode实现以及HDFS客户端实现等。《Hadoop 2.X HDFS源码剖析》一共有5章,其中...
《Hadoop 2.X HDFS源码剖析》以Hadoop 2.6.0源码为基础,深入剖析了HDFS 2.X中各个模块的实现细节,包括RPC框架实现、Namenode实现、Datanode实现以及HDFS客户端实现等。《Hadoop 2.X HDFS源码剖析》一共有5章,其中...
1.2.1 Hadoop RPC接口 4 1.2.2 流式接口 20 1.3 HDFS主要流程 22 1.3.1 HDFS客户端读流程 22 1.3.2 HDFS客户端写流程 24 1.3.3 HDFS客户端追加写流程 25 1.3.4 Datanode启动、心跳以及执行名字节点指令...
hadoop入门级的代码 Java编写 eclipse可运行 包含 hdfs的文件操作 rpc远程调用的简单示例 map-reduce的几个例子:wordcount 学生平均成绩 手机流量统计
hadoop rpc 云计算 hdfs mapreduce
NameNode源码分析(RPC是基础) DataNode源码分析 FileSystem源码分析(如何与NameNode通信ClientProtocol) JobTracker源码分析 TaskTracker源码分析 网站日志分析项目(这个项目分析可以让你更加掌握好所学的知识...
02 - 全网最全的Hadoop集群搭建视频 03 - 深度揭秘世界级分布式文件系统 HDFS 架构设计 04 - 老司机带你自研RPC 05 - 老司机带你自研分布式文件系统 06 - 老司机带你自研分布式计算引擎 07 - Hive底层执行引擎深度...