`
y806839048
  • 浏览: 1081592 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Dubbo服务调用过程 (编解码 + 服务提供方返回调用结果)

阅读更多

需求:

dubbo 的 消息体是怎么样的? 如何去定义消息体的。  另外我们都知道,当多个消费者对同一个dubbo 的provider 进行消费的时候,Dubbo 会将响应对象派发到线程池上,dubbo 是如何将响应对象从线程池线程传递到用户线程上。本文基于2.7.0 以下版本,2.7.0 版本,dubbo 协议的通信完全基于 completetableFuture 的方式调用。

(1):

首先预备一下知识。 Dubbo 的服务调用,再对dubbo 扩展和理解的时候,我们需要谨记下面的调用连过程,这样我们就知道扩展哪一层的SPI,最合适。层级是非常明确的。

 
 

(2).

我们来看看Dubbo 的消息头,对于编解码来说,我们需要知道dubbo 的消息头的组成有哪些,各个组成的部分有什么样的作用。下面的图来自与dubbo 的官网,也就是说,对于不同的协议,dubbo 的消息头的组成是不一样的,分别有 network (http 协议), tcp 协议,dubbo 协议等组成。 咱们来看看dubbo 的协议,dubbo 的协议通信层是netty,再进入到通信层之前,我们需要对dubbo 的头信息进行编码,那看看dubbo 的头信息有哪些。

 
 

那我们来具体的看看dubbo 的协议的消息头的组成部分。128位,一共16 个字节,每个自己都记录相关的元数据信息

0 ~ 7 : dubbo 魔数((short) 0xdabb) 高位,也就是 (short) 0xda。这个魔数是做什么使用的,是一种标识吗?其实在编码和解码的过程中会对比 这个魔数。 就像SUN 公司规定每个 class 文件都必须以一个 word(4 个字节 ) 来开始,这个数字就称为魔数。

8 ~ 15: dubbo 魔数((short) 0xdabb) 低位,也就是 (short) 0xbb。我有同样的疑惑,dubbo 高低位魔数是做什么使用的。

16: 数据包类型, 0 - Response, 1 - Request

17: 调用方式,  仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用,  这里我不太清楚何为双向调用? 我的想法是 dubbo 提供的callback 功能, 也就是说consumer 调用 provider, provider 有response 之后,会回调consumer 的相关函数。

18: 事件标识, 0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包

19 ~ 23: 序列化器编号, 比如 2 是 Hessian2Serialization

24 ~ 31: 状态, 20 - OK, 30 - CLIENT_TIMEOUT, 31 - SERVER_TIMEOUT

32 ~ 95: 请求编号, 共8字节,运行时生成

96 ~ 127: 消息体长度, 运行时计算

 
 

下面来看看编码: encode

 

@Override publicvoidencode(Channel channel, ChannelBuffer buffer, Object msg)throwsIOException{

        if (msg instanceof Request) {

            // 对 Request 对象进行编码            encodeRequest(channel, buffer, (Request) msg);

        } else if (msg instanceof Response) {

            // 对 Response 对象进行编码,后面分析            encodeResponse(channel, buffer, (Response) msg);

        } else {

            super.encode(channel, buffer, msg);

        }

    }

然后进入 encodeRequest 方法,我们看看这个方法的过程:

// 创建消息头字节数组,长度为 16 byte[] header = new byte[HEADER_LENGTH];

// 设置魔数 Bytes.short2bytes(MAGIC, header);

// 设置数据包类型(Request/Response)和序列化器编号 header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

// 设置通信方式(单向/双向) if (req.isTwoWay()) {

            header[2] |= FLAG_TWOWAY;

        }

// 设置事件标识 if (req.isEvent()) {

            header[2] |= FLAG_EVENT;

        }

// 获取 buffer 当前的写位置 int savedWriteIndex = buffer.writerIndex();

// 更新 writerIndex,为消息头预留 16 个字节的空间 buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);

        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);

        // 创建序列化器,比如 Hessian2ObjectOutput        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);

        if (req.isEvent()) {

            // 对事件数据进行序列化操作            encodeEventData(channel, out, req.getData());

        } else {

            // 对请求数据进行序列化操作            encodeRequestData(channel, out, req.getData(), req.getVersion());

        }

// 获取写入的字节数,也就是消息体长度 int len = bos.writtenBytes();

        checkPayload(channel, len);

        // 将消息体长度写入到消息头中        Bytes.int2bytes(len, header, 12);

请大家好好补补对byte 数组进行运算的知识。 方可理解dubbo 源码里面到底是如何对128 位,16个字节进行运算的。并要清楚了解 byte 的运算中 &,|, >>, <<等运算符。

以上就是请求对象的编码过程,该过程首先会通过位运算将消息头写入到 header 数组中。然后对 Request 对象的 data 字段执行序列化操作,序列化后的数据最终会存储到 ChannelBuffer 中。序列化操作执行完后,可得到数据序列化后的长度 len,紧接着将 len 写入到 header 指定位置处。最后再将消息头字节数组 header 写入到 ChannelBuffer 中,整个编码过程就结束了。对data 的序列化过程如下,请看encodeRequestData方法,其大致过程如下:

// 依次序列化 dubbo version、path、version,  序列化调用方法名

out.writeUTF(version);

out.writeUTF(inv.getAttachment(Constants.PATH_KEY));

out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

out.writeUTF(inv.getMethodName());

// 将参数类型转换为字符串,并进行序列化

out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));

// 对运行时参数进行序列化 out.writeObject(encodeInvocationArgument(channel, inv, i));

// 序列化 attachments out.writeObject(inv.getAttachments());

下面再来看看 decode 的过程

默认情况下 Dubbo 使用 Netty 作为底层的通信框架。Netty 检测到有数据入站后,首先会通过解码器对数据进行解码

@Override protectedObjectdecode(Channel channel, ChannelBuffer buffer,intreadable,byte[] header)throwsIOException{

        // 检查魔数是否相等

   if (readable > 0 && header[0] != MAGIC_HIGH

                || readable > 1 && header[1] != MAGIC_LOW) {

            int length = header.length;

            if (header.length < readable) {

                header = Bytes.copyOf(header, readable);

                buffer.readBytes(header, length, readable - length);

            }

            for (int i = 1; i < header.length - 1; i++) {

                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {

                    buffer.readerIndex(buffer.readerIndex() - header.length + i);

                    header = Bytes.copyOf(header, i);

                    break;

                }

            }

            // 通过 telnet 命令行发送的数据包不包含消息头,所以这里            // 调用 TelnetCodec 的 decode 方法对数据包进行解码            return super.decode(channel, buffer, readable, header);

        }

        // 检测可读数据量是否少于消息头长度,若小于则立即返回 DecodeResult.NEED_MORE_INPUT        if (readable < HEADER_LENGTH) {

            return DecodeResult.NEED_MORE_INPUT;

        }

        // 从消息头中获取消息体长度        int len = Bytes.bytes2int(header, 12);

        // 检测消息体长度是否超出限制,超出则抛出异常        checkPayload(channel, len);

        int tt = len + HEADER_LENGTH;

        // 检测可读的字节数是否小于实际的字节数        if (readable < tt) {

            return DecodeResult.NEED_MORE_INPUT;

        }

        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {

            // 继续进行解码工作            return decodeBody(channel, is, header);

        } finally {

            if (is.available() > 0) {

                try {

                    StreamUtils.skipUnusedStream(is);

                } catch (IOException e) {

                    logger.warn(e.getMessage(), e);

                }

            }

        }

    }

上面方法通过检测消息头中的魔数是否与规定的魔数相等,提前拦截掉非常规数据包,比如通过 telnet 命令行发出的数据包。接着再对消息体长度,以及可读字节数进行检测。最后调用 decodeBody 方法进行后续的解码工作,ExchangeCodec 中实现了 decodeBody 方法,但因其子类 DubboCodec 覆写了该方法,所以在运行时 DubboCodec 中的 decodeBody 方法会被调用。

在DecodeableRpcInvocation class 中,会将dubbo version, path, method name, attachment 解码出来,最终得到一个具有完整调用信息的 DecodeableRpcInvocation 对象。

下面来看看  服务提供方返回调用结果

服务提供方调用指定服务后,会将调用结果封装到 Response 对象中,并将该对象返回给服务消费方。服务提供方也是通过 NettyChannel 的 send 方法将 Response 对象返回。当响应数据解码完成之后,dubbo 会将响应对象派发到线程池上,但是线程池中的线程并非用户的调用线程,所以要想办法将响应对象从线程池线程传递到用户线程上。

那么我们首先来看看用户线程现在在干什么?  用户线程在发送完请求后,就调用defaultFuture 的get 方法等待响应对象的到来,当响应对象到来之后,用户线程会被唤醒,并通过调用编号获取属于自己的响应对象。

doReceive 方法,请大家自行查看DefaultFuture 方法,需要去理解下, ReentrantLock,还有 lock.newcondition, 的用法。 并且它与synchronized, wait,notify 的区别。

那么调用编号是怎么回事呢?

一般情况下,服务消费方会并发调用多个服务,每个用户线程发送请求后,会调用不同 DefaultFuture 对象的 get 方法进行等待。 一段时间后,服务消费方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将每个响应对象传递给相应的 DefaultFuture 对象,且不出错。答案是通过调用编号。DefaultFuture 被创建时,会要求传入一个 Request 对象。此时 DefaultFuture 可从 Request 对象中获取调用编号,并将 <调用编号, DefaultFuture 对象> 映射关系存入到静态 Map 中,即 FUTURES。线程池中的线程在收到 Response 对象后,会根据 Response 对象中的调用编号到 FUTURES 集合中取出相应的 DefaultFuture 对象,然后再将 Response 对象设置到 DefaultFuture 对象中。最后再唤醒用户线程,这样用户线程即可从 DefaultFuture 对象中获取调用结果了。整个过程大致如下图:

 

 
 

注意: 本文的大部分内容来自于dubbo 官网对 dubbo 源码的分析,甚是奇妙,我也想编辑下来,最为以后的代码设计的参照。 

问个问题,请问 synchronized, ReentrantLock, ReadWriteLock , 之间的区别?

信号之间的通知是如何的,比如 notify, wait, ReentrantLock 的 condition 的使用。 谢谢大家时间,希望有所得!

 

参考:

https://www.jianshu.com/p/99a0bc93eeb6

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics