`
chenzehe
  • 浏览: 532426 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Hadoop IPC RPC类中对请求的客户端缓存类ClientCache问题

 
阅读更多

        Hadoop IPC RPC类中对请求的客户端缓存类ClientCache,是一个用HashMap进行对象缓存的类,但是对缓存操作时都使用synchronized关键字来加锁,如果使用ConcurrentHashMap进行进行缓存,在存取时会有更好的性能。ConcurrentHashMap是基于分段的锁分离技术实现,而且使用JUC中的显示锁来保证同步,多线程方面性能比HashMap有明显的优势。

 

  /* Cache a client using its socket factory as the hash key */
  private class ClientCache {
    private Map<SocketFactory, Client> clients =
      new HashMap<SocketFactory, Client>();

    /**
     * Construct & cache an IPC client with the user-provided SocketFactory 
     * if no cached client exists.
     * 
     * @param conf Configuration
     * @return an IPC client
     */
    private synchronized Client getClient(Configuration conf,
        SocketFactory factory) {
      // Construct & cache client.  The configuration is only used for timeout,
      // and Clients have connection pools.  So we can either (a) lose some
      // connection pooling and leak sockets, or (b) use the same timeout for all
      // configurations.  Since the IPC is usually intended globally, not
      // per-job, we choose (a).
      Client client = clients.get(factory);
      if (client == null) {
        client = new Client(ObjectWritable.class, conf, factory);
        clients.put(factory, client);
      } else {
        client.incCount();
      }
      return client;
    }

    /**
     * Construct & cache an IPC client with the default SocketFactory 
     * if no cached client exists.
     * 
     * @param conf Configuration
     * @return an IPC client
     */
    private synchronized Client getClient(Configuration conf) {
      return getClient(conf, SocketFactory.getDefault());
    }

    /**
     * Stop a RPC client connection 
     * A RPC client is closed only when its reference count becomes zero.
     */
    private void stopClient(Client client) {
      synchronized (this) {
        client.decCount();
        if (client.isZeroReference()) {
          clients.remove(client.getSocketFactory());
        }
      }
      if (client.isZeroReference()) {
        client.stop();
      }
    }
  }

 

 上面代码同使用synchronized同步读写操作都是放到方法前,这样会对整个类实例加锁,而不只是对缓存对象加锁,对该Client的引用计数使用基本int类型进行++和--操作再使用synchronized关键字保证同步,不如使用原子操作类使用方便和有更好的性能。

private synchronized Client getClient(Configuration conf,
        SocketFactory factory) {
      // Construct & cache client.  The configuration is only used for timeout,
      // and Clients have connection pools.  So we can either (a) lose some
      // connection pooling and leak sockets, or (b) use the same timeout for all
      // configurations.  Since the IPC is usually intended globally, not
      // per-job, we choose (a).
      Client client = clients.get(factory);
      if (client == null) {
        client = new Client(ObjectWritable.class, conf, factory);
        clients.put(factory, client);
      } else {
        client.incCount();
      }
      return client;
}


Client类:
private int refCount = 1;
synchronized void incCount() {
    refCount++;
}

synchronized void decCount() {
    refCount--;
}

 

 

 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics