Hadoop IPC RPC类中对请求的客户端缓存类ClientCache,是一个用HashMap进行对象缓存的类,但是对缓存操作时都使用synchronized关键字来加锁,如果使用ConcurrentHashMap进行进行缓存,在存取时会有更好的性能。ConcurrentHashMap是基于分段的锁分离技术实现,而且使用JUC中的显示锁来保证同步,多线程方面性能比HashMap有明显的优势。
/* Cache a client using its socket factory as the hash key */ private class ClientCache { private Map<SocketFactory, Client> clients = new HashMap<SocketFactory, Client>(); /** * Construct & cache an IPC client with the user-provided SocketFactory * if no cached client exists. * * @param conf Configuration * @return an IPC client */ private synchronized Client getClient(Configuration conf, SocketFactory factory) { // Construct & cache client. The configuration is only used for timeout, // and Clients have connection pools. So we can either (a) lose some // connection pooling and leak sockets, or (b) use the same timeout for all // configurations. Since the IPC is usually intended globally, not // per-job, we choose (a). Client client = clients.get(factory); if (client == null) { client = new Client(ObjectWritable.class, conf, factory); clients.put(factory, client); } else { client.incCount(); } return client; } /** * Construct & cache an IPC client with the default SocketFactory * if no cached client exists. * * @param conf Configuration * @return an IPC client */ private synchronized Client getClient(Configuration conf) { return getClient(conf, SocketFactory.getDefault()); } /** * Stop a RPC client connection * A RPC client is closed only when its reference count becomes zero. */ private void stopClient(Client client) { synchronized (this) { client.decCount(); if (client.isZeroReference()) { clients.remove(client.getSocketFactory()); } } if (client.isZeroReference()) { client.stop(); } } }
上面代码同使用synchronized同步读写操作都是放到方法前,这样会对整个类实例加锁,而不只是对缓存对象加锁,对该Client的引用计数使用基本int类型进行++和--操作再使用synchronized关键字保证同步,不如使用原子操作类使用方便和有更好的性能。
private synchronized Client getClient(Configuration conf, SocketFactory factory) { // Construct & cache client. The configuration is only used for timeout, // and Clients have connection pools. So we can either (a) lose some // connection pooling and leak sockets, or (b) use the same timeout for all // configurations. Since the IPC is usually intended globally, not // per-job, we choose (a). Client client = clients.get(factory); if (client == null) { client = new Client(ObjectWritable.class, conf, factory); clients.put(factory, client); } else { client.incCount(); } return client; } Client类: private int refCount = 1; synchronized void incCount() { refCount++; } synchronized void decCount() { refCount--; }
相关推荐
Hadoop_RPC详细分析.doc,这是你多关注的
hadoop rpc实例,
使用Hadoop的RPC机制创建一个协议接口、通信服务端、通信客户端程序 目标:通过该任务,理解分布式系统中远程过程调用协议,掌握分布式系统中客户机与服务器的通信机制。
Hadoop rpc源码是从Hadoop分离出的ipc,去掉了认证部分,附录使用文档.使用前请add lib包commons-logging-*.*.*.jar(我用的是1.0.4)和log4j-*.*.*.jar(我的1.2.13) 相关blog post: ...
rpc架构与hadoop分享
1.java接口操作Hadoop文件系统(文件上传下载删除创建...可举一反三) 2.RPC远程过程调用的java代码实现,便于理解Hadoop的RPC协议,具体使用方法可参考我的博客...
java操作hadoop的RPC源码,附带所需全部jar包,欢迎下载学习。
org.apache.hadoop.security.AccessControlException: Permission denied: user=xudsa, access=WRITE, inode="/uploaddemo1.txt":hadoop:supergroup:-rw-r--r--
Hadoop自己的Rpc框架使用Demo。可以在自己的项目中用Hadoop的Rpc框架了。
NULL 博文链接:https://ouyida3.iteye.com/blog/1144326
NULL 博文链接:https://wmwork2010.iteye.com/blog/632016
传智播客吴超老师RPC小例子
关于hadoop的分布式缓存的源码,用于大家的学习,改进hadoop的分布式缓存
接收Call调用负责接收来自RPCClient的调用请求,编码成Call对象后放入到Call队列中。这一过程由Listener线程完成。具体步骤:处理Call调用负责处理Call队列中的每个调用请求,由Handler线程完成:交互过程如下图所示...
Hadoop里的RPC机制过程,代码示例rpc调用过程
hadoop-2.6.0 Window客户端,解压到目录,设置环境变量即可使用。java调用实例:// windows环境下需要配置Hadoop的客户端 System.setProperty("hadoop.home.dir", "E:/hadoop-2.6.0/"); conf = new Configuration...
NULL 博文链接:https://zqhxuyuan.iteye.com/blog/1879292
赠送jar包:hadoop-yarn-client-2.6.5.jar; 赠送原API文档:hadoop-yarn-client-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-client-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-yarn-client-2.6.5.pom;...
在windows11平台,基于hadoop3.3.4源码包自编译,按需自取,内含hadoop.dll等文件以及全部源码等
NULL 博文链接:https://demon3780.iteye.com/blog/1453862