`
半点玻璃心
  • 浏览: 26669 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

HBASE 代码阅读笔记-1 - PUT-3-提交任务1(基于0.94.12)

阅读更多
终于把RS的定位问题搞清楚了些些,时间不等人,马上看看connection.processBatch中,step2是如何把任务提交到服务端的吧

之前已经看到,首先创建了一个Callable<MuiltyResponse>对象,而该对象的call方法实际上又创建了一个ServerCallable<MultiResponse> 对象,然后调用了它的withoutRetries方法。
这个方法很简单,调用了connect方法和multi方法
一个个开始啃吧,先看看connect,其中server是ServerCallable的成员,HRegionInterface类型
    // 备注【1】:这是ServerCallable默认的connect方法
    public void connect(final boolean reload) throws IOException {
        this.location = connection.getRegionLocation(tableName, row, reload);
        this.server = connection.getHRegionConnection(location.getHostname(),
                location.getPort());
    }
    // 备注【1】:这是createCallable的时候重写ServerCallable connect方法
    // 此前的代码中已经在step1获取到了rowkey对应的region信息,所以这里就不再重复获取了,这毕竟是一个重量级的操作
    public void connect(boolean reload) throws IOException {
        server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());
    } 


HRegionInterface
        public HRegionInterface getHRegionConnection(final String hostname,
                                                     final int port)
                throws IOException {
            return getHRegionConnection(hostname, port, false);
        }
        @Override
        public HRegionInterface getHRegionConnection(final String hostname,final int port, final boolean master)
                throws IOException {
            return getHRegionConnection(hostname, port, null, master);
        }


        HRegionInterface getHRegionConnection(final String hostname, final int port,
                                              final InetSocketAddress isa, final boolean master)
                throws IOException {
            if (master) getMaster();//如果是链接master,就去找吧,这里先不深究了
            HRegionInterface server;
            String rsName = null;
            if (isa != null) {
                rsName = Addressing.createHostAndPortStr(isa.getHostName(),
                        isa.getPort());
            } else {
                rsName = Addressing.createHostAndPortStr(hostname, port);
            }
            ensureZookeeperTrackers();
            // See if we already have a connection (common case)
            server = this.servers.get(rsName);
            if (server == null) {
                // create a unique lock for this RS (if necessary)
                this.connectionLock.putIfAbsent(rsName, rsName);
                // get the RS lock
                synchronized (this.connectionLock.get(rsName)) {
                    // do one more lookup in case we were stalled above
                    server = this.servers.get(rsName);
                    if (server == null) {
                        try {
                            // Only create isa when we need to.
                            InetSocketAddress address = isa != null ? isa :
                                    new InetSocketAddress(hostname, port);
                            // definitely a cache miss. establish an RPC for this RS
                            // 前面都是构建地址,缓存判断、操作之类,这里是核心代码
                            server = HBaseRPC.waitForProxy(this.rpcEngine,
                                    serverInterfaceClass, HRegionInterface.VERSION,
                                    address, this.conf,
                                    this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
                            this.servers.put(Addressing.createHostAndPortStr(
                                    address.getHostName(), address.getPort()), server);
                        } catch (RemoteException e) {
                            LOG.warn("RemoteException connecting to RS", e);
                            // Throw what the RemoteException was carrying.
                            throw e.unwrapRemoteException();
                        }
                    }
                }
            }
            return server;
        }


这段代码在regionlocation的博文中已经帖过,主要是利用servername string做了一个hash缓存,如果已经存在则返回之,否则创建并缓存之

HBaseRPC.waitForProxy,参数比较多
后面5个就不说了,常见参数,先看看前三个吧
RpcEngine:注释说的很清楚,RPC实现。具体是什么后续跟进
protocol:HRegionInterface的子类
clientVersion:HRegionInterface.VERSION,当前为常量29。不多说了,啃代码。主代码目测也太简单了,直接循环尝试返回rpcClient.getProxy,然后处理异常。看来还得往下挖
    public static <T extends VersionedProtocol> T waitForProxy(RpcEngine rpcClient,
             Class<T> protocol,
             long clientVersion,
             InetSocketAddress addr,
             Configuration conf,
             int maxAttempts,//hbase.client.rpc.maxattempts,默认1,这个坑货配置文件里还没有
             int rpcTimeout,
             long timeout//hbase.rpc.timeout,默认60秒,亲,你们的客户端能忍受这么长的超时等待吗
    ) throws IOException {
        // HBase does limited number of reconnects which is different from hadoop.
        long startTime = System.currentTimeMillis();
        IOException ioe;
        int reconnectAttempts = 0;
        while (true) {
            try {
                return rpcClient.getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
            } catch (SocketTimeoutException te) {  // namenode is busy
                LOG.info("Problem connecting to server: " + addr);
                ioe = te;
            } catch (IOException ioex) {
                // We only handle the ConnectException.
                ConnectException ce = null;
                if (ioex instanceof ConnectException) {
                    ce = (ConnectException) ioex;
                    ioe = ce;
                } else if (ioex.getCause() != null
                        && ioex.getCause() instanceof ConnectException) {
                    ce = (ConnectException) ioex.getCause();
                    ioe = ce;
                } else if (ioex.getMessage().toLowerCase()
                        .contains("connection refused")) {
                    ce = new ConnectException(ioex.getMessage());
                    ioe = ce;
                } else {
                    // This is the exception we can't handle.
                    ioe = ioex;
                }
                if (ce != null) {
                    handleConnectionException(++reconnectAttempts, maxAttempts, protocol,
                            addr, ce);
                }
            }
            // check if timed out
            if (System.currentTimeMillis() - timeout >= startTime) {
                throw ioe;
            }

            // wait for retry
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ie) {
                // IGNORE
            }
        }
    }


好吧,原来rpcClient.getProxy才是重头戏,一次次的浇熄哥接近终点的热情
	if (rpcEngine == null) {
                this.rpcEngine = HBaseRPC.getProtocolEngine(conf);
        }		
			
	public static synchronized RpcEngine getProtocolEngine(Configuration conf) {
        // check for a configured default engine
        Class<?> impl =
                conf.getClass(RPC_ENGINE_PROP, WritableRpcEngine.class);

        LOG.debug("Using RpcEngine: " + impl.getName());
        RpcEngine engine = (RpcEngine) ReflectionUtils.newInstance(impl, conf);
        return engine;
    }

RPCEgine:WritableRpcEngine,你也可以通过hbase.rpc.engine配置一个全限定的类名来覆盖它
public <T extends VersionedProtocol> T getProxy(
            Class<T> protocol, long clientVersion,
            InetSocketAddress addr, Configuration conf, int rpcTimeout)
            throws IOException {
        if (this.client == null) { // client是一个HBaseClient实例,RPCEngine初始化的时候在setconf方法中注入// MY TODO
            throw new IOException("Client must be initialized by calling setConf(Configuration)");
        }
        // 真的是创建了一个代理呢
        T proxy =
                (T) Proxy.newProxyInstance(
                        protocol.getClassLoader(), new Class[]{protocol},
                        new Invoker(client, protocol, addr, User.getCurrent(), conf,
                                HBaseRPC.getRpcTimeout(rpcTimeout)));

    /*
     * TODO: checking protocol version only needs to be done once when we setup a new
     * HBaseClient.Connection.  Doing it every time we retrieve a proxy instance is resulting
     * in unnecessary RPC traffic.
     */ //检查是否服务端版本号与客户端版本号是否一致,否则只能说再见了
        long serverVersion = ((VersionedProtocol) proxy)
                .getProtocolVersion(protocol.getName(), clientVersion);
        if (serverVersion != clientVersion) {
            throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
                    serverVersion);
        }

        return proxy;
    }


这就拿到代理类了,Handler实现为Invoker,WritableRpcEngine的一个内部类

接着继续看Invoker吧,真的是代理哦,太简单了就是记录了时间,然后就没有然后了。几乎没有业务,实现放在了Invocation类中,由HBaseClient调用

      public Object invoke(Object proxy, Method method, Object[] args)
                throws Throwable {
            final boolean logDebug = LOG.isDebugEnabled();
            long startTime = 0;
            if (logDebug) {
                startTime = System.currentTimeMillis();
            }

            HbaseObjectWritable value = (HbaseObjectWritable)
                    client.call(new Invocation(method, protocol, args), address,
                            protocol, ticket, rpcTimeout);
            if (logDebug) {
                // FIGURE HOW TO TURN THIS OFF!
                long callTime = System.currentTimeMillis() - startTime;
                LOG.debug("Call: " + method.getName() + " " + callTime);
            }
            return value.get();
        }

        public Invocation(Method method,
                      Class<? extends VersionedProtocol> declaringClass, Object[] parameters) {
        this.methodName = method.getName();
        this.parameterClasses = method.getParameterTypes();
        this.parameters = parameters;
        if (declaringClass.equals(VersionedProtocol.class)) {
            //VersionedProtocol is exempted from version check.
            clientVersion = 0;
            clientMethodsHash = 0;
        } else {
            try {
                Field versionField = declaringClass.getField("VERSION");
                versionField.setAccessible(true);
                this.clientVersion = versionField.getLong(declaringClass);
            } catch (NoSuchFieldException ex) {
                throw new RuntimeException("The " + declaringClass, ex);
            } catch (IllegalAccessException ex) {
                throw new RuntimeException(ex);
            }
            this.clientMethodsHash = ProtocolSignature.getFingerprint(
                    declaringClass.getMethods());
        }
    }

Invocation类其实主要是将要执行的类,方法,以及方法参数做了一层基于writable的封装,依赖HbaseObjectWritable类序列化和反序列化参数。这个比较漫长,也比较枯燥,专门拿一天写一篇来解析吧。

现在执行序列回到getHRegionConnection,我们拿到并缓存了一个HRegionInterface的一个代理,这个代理在执行的时候实际上是调用HbaseClient的Call方法。至于Call方法里面都做了些神马,后续再看。

既然连接准备好了,就该调用call方法了,call方法实际上是调用了server.multi方法,也就是HRegionInterface的multi方法。这下终于轮到HbaseClient出场了,隐藏得好深。

public Writable call(Writable param, InetSocketAddress addr,
                         Class<? extends VersionedProtocol> protocol,
                         User ticket, int rpcTimeout)
            throws InterruptedException, IOException {
        Call call = new Call(param);//param,这里就是封装好类名、方法名以及参数的invocation对象
        Connection connection = getConnection(addr, protocol, ticket, rpcTimeout, call);//这里开始连接,亲,你真的要连接了吗,不要骗人啊。
        connection.sendParam(call);                 // MS真的连接了,还把执行代理发了出去
        boolean interrupted = false;
        //noinspection SynchronizationOnLocalVariableOrMethodParameter
        synchronized (call) {
            //如果请求没完成,连接没中断,线程没中断,没超时没错误,等吧。
            while (!call.done) {
                if (connection.shouldCloseConnection.get()) {
                    throw new IOException("Unexpected closed connection");
                }
                try {
                    call.wait(1000);                       // wait for the result
                } catch (InterruptedException ignored) {
                    // save the fact that we were interrupted
                    interrupted = true;
                }
            }

            if (interrupted) {
                // set the interrupt flag now that we are done waiting
                Thread.currentThread().interrupt();
            }

            if (call.error != null) {
                if (call.error instanceof RemoteException) {
                    call.error.fillInStackTrace();
                    throw call.error;
                }
                // local exception
                throw wrapException(addr, call.error);
            }
            return call.value;
        }
    }


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics