`
manzhizhen
  • 浏览: 289367 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Dubbo源代码实现六:线程池模型与提供者

阅读更多

对于Dubbo的服务提供者,主要有两种线程池,一种是IO处理线程池,另一种是服务调用线程池。而作为IO处理线程池,由于Dubbo基于Mina、Grizzly和Netty框架做IO组件,IO线程池都是基于这些框架来配置,比如Netty中的boss和worker线程池,Dubbo选择的是“无边界”的CachedThreadPool,这意味着对所有服务请求先做到“来者不拒”,但它进一步限制了IO处理的线程数,默认是核数+1本文拿Netty组件举例,代码见NettyServer#open

 

ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));

ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));

 

<!--StartFragment-->ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));<!--EndFragment-->

 

那么当请求到达Dubbo的提供者端,会进行哪些处理呢?用过Dubbo框架的同学哪怕没看过源码也能猜出个大概,请求处理会包含三部分:请求解析、服务调用和应答。请求解析需要确认请求的正确性,比如请求解码(比如协议是否正确)、请求是否合法(提供者端是否有该服务;该服务是否需要token验证来防止绕过注册中心 直连);服务调用过程就是提供者作为服务端的一个服务处理过程,这个过程需要用到前面说到的第二种服务调用线程池来执行,该过程通过线程池来和请求解析过程分开,这样做的目的一是过程解耦,二是可以做到服务提供者超时返回,为了让用户能对该过程进行拦截,Dubbo特意通过SPI实现了Filter机制,用户可以通过自定义Filter来对服务调用进行日志记录和监控,当然前提是服务调用线程池还没被请求打满;应答过程主要是对结果进行编码并返回。

 

我们先来仔细看看请求解析过程,我们这里参照的是Netty,使用过Netty的同学都知道,如果想自定义流处理、协议的编解码功能,需要自己去实现一些适配类,比如Netty3.x中的 SimpleChannelHandlerNetty4.x中的ChannelInboundHandlerAdapter,Dubbo的2.5.3版本依赖的是Netty3.x版本,我们这里直接可以从NettyServer#doOpen方法看出Dubbo向Netty中注册了哪些Handler,这些Handler是请求数据处理的第一道屏障:

 

@Override

protected void doOpen() throws Throwable {

    NettyHelper.setNettyLoggerFactory();

    // 无界的Netty boss线程池,负责和消费者建立新的连接

    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));

    // 无界的Netty worker线程池,负责连接的数据交换

    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));

    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));

    // Netty服务启动类

    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);

    channels = nettyHandler.getChannels();

    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

        public ChannelPipeline getPipeline() {

            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);

            ChannelPipeline pipeline = Channels.pipeline();

  // 解码处理器,{@link InternalDecoder}

            pipeline.addLast("decoder", adapter.getDecoder());

  // 编码处理器,{@link InternalEncoder}

            pipeline.addLast("encoder", adapter.getEncoder());

  // 数据解析后流程处理的起点,{@link NettyHandler}

            pipeline.addLast("handler", nettyHandler);

            return pipeline;

        }

    });

    // 绑定端口

    channel = bootstrap.bind(getBindAddress());

}

 

从这里可以看出,如果我们在一个JVM进程只暴露一个Dubbo服务端口,那么一个JVM进程只会有一个NettyServer实例,也会只有一个NettyHandler实例,但如果应用即是消费者,也是提供者,那么将会存在多个NettyHandler。从上面代码也可以看出,Dubbo在Netty的Pipeline中只注册了三个Handler,而Dubbo内部也定义了一个ChannelHandler接口,用来将和Channel相关的处理串起来,而第一个ChannelHandler就是由NettyHandler来调用的。有趣的是NettyServer本身也是一个ChannelHandler当Dubbo将Spring容器中的服务实例做了动态代理的处理后,就会通过NettyServer#doOpen来暴露服务端口,再接着将服务注册到注册中心。这些步骤做完后,Dubbo的消费者就可以来和提供者建立连接了,当然是消费者来主动建立连接,而提供者在初始化连接后会调用NettyHandler#channelConnected方法来创建一个NettyChannel

 

@Override

public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

    // channelMap中创建或获取一个NettyChannel

    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);

    try {

        if (channel != null) {

            // 如果在channels中没有则创建,注意这里的key是远端消费者的地址,即IP+端口

            channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel);

        }

 

        // 这里的handler正是创建此NettyHandlerNettyServer

        handler.connected(channel);

 

    } finally {

        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());

    }

}

 

NettyHandler有两个重要的属性用来保存当前的Netty ChannelNetty ChannelDubbo内部NettyChannel的映射关系:

 

private final org.jboss.netty.channel.Channel channel;

private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>();

 

从上面的代码可以看出,就像Netty和Dubbo都有自己的ChannelHandler一样,Netty和Dubbo也有着自己的Channel。该方法最后会调用NettyServer#connected方法来检查新添加channel后是否会超出提供者配置的accepts配置,如果超出,则直接打印错误日志并关闭该Channel,这样的话消费者端自然会收到连接中断的异常信息,详细可以见AbstractServer#connected方法。这里我们也可以看出,消费者和提供者建立的每一个TCP连接都放到了NettyHandler的channels中。还记得我们在《Dubbo源代码实现五》中提到的,消费者和提供者之间默认只会建立一条TCP长连接,为了增加消费者调用服务提供者的吞吐量,可以在消费者的dubbo:reference中配置connections来单独增加消费者和服务提供者的TCP长连接吗?作为服务提供者,也同样可以限制所接收的连接数,例如:

 

<dubbo:protocol name="dubbo" port="8888" threads="500" accepts="200"/>

 

需要注意的是,这种配置的长连接不会像JDK中的线程池那样按需来建立,而是在消费者启动后就全部创建好,的如果消费者“太过分”的话,即消费者配置的连接数已经超过了服务提供者的accepts,那么多余连接的建立时会遭到提供者拒绝,于是消费者将收到如下异常

 

[18/06/17 04:02:55:055 CST] DubboClientReconnectTimer-thread-2  WARN transport.AbstractClient:  [DUBBO] client reconnect to 提供者IP+端口 find error . url: dubbo://提供者IP+端口/......

com.alibaba.dubbo.remoting.RemotingException: Failed connect to server /提供者IP+端口 from NettyClient 消费者IP using dubbo version 2.5.3, cause: Connect wait timeout: 1500000ms.

at com.alibaba.dubbo.remoting.transport.AbstractClient.connect(AbstractClient.java:282)

at com.alibaba.dubbo.remoting.transport.AbstractClient$1.run(AbstractClient.java:145)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)

at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

 

当连接建立完毕后,消费者就可以请求提供者的服务了,当请求到来,提供者这边会依次经过如下Handler的处理:

NettyCodecAdapter$InternalDecoder#messageReceived对请求进行解码。

NettyHandler#messageReceived根据Netty Channel来获取Dubbo的Channel,并开始调用Dubbo的Handler。

AbstractPeer#received:如果服务已经关闭,则返回,否则调用下一个Handler来处理。

MultiMessageHandler#received:如果是批量请求,则依次对请求调用下一个Handler来处理。

AllChannelHandler#received:该Dubbo的Handler非常重要,因为从这里是IO线程池和服务调用线程池的边界线,该Handler将服务调用操作直接提交给服务调用线程池并返回。

 

我们这里仔细看一下AllChannelHandler#received

 

public void received(Channel channel, Object message) throws RemotingException {

    // 获取服务调用线程池

    ExecutorService cexecutor = getExecutorService();

    try {

        cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));

    } catch (Throwable t) {

        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);

    }

}

 

我们注意到这里对execute进行了异常捕获,这是因为IO线程池是无界的(0-Integer.MAX_VALUE),但服务调用线程池是有界的,所以进行execute提交可能会遇到RejectedExecutionException异常,这也是为什么我们会在dubbo输出的日志中看到如下片段:

 

2017-06-16 22:01:03-WARN org.jboss.netty.channel.DefaultChannelPipeline-  [DUBBO] An exception was thrown by a user handler while handling an exception event ([id: 0xad26fbf0, /消费者IP+端口=> /提供者IP+端口] EXCEPTION: com.alibaba.dubbo.remoting.ExecutionException: class com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler error when process received event .), dubbo version: 2.5.3, current host: 127.0.0.1 - New I/O worker #95

 com.alibaba.dubbo.remoting.ExecutionException: class com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler error when process caught event .

        at com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.caught(AllChannelHandler.java:67)

        at com.alibaba.dubbo.remoting.transport.AbstractChannelHandlerDelegate.caught(AbstractChannelHandlerDelegate.java:44)

        at com.alibaba.dubbo.remoting.transport.AbstractChannelHandlerDelegate.caught(AbstractChannelHandlerDelegate.java:44)

        at com.alibaba.dubbo.remoting.transport.AbstractPeer.caught(AbstractPeer.java:127)

        at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.exceptionCaught(NettyHandler.java:112)

        at com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalDecoder.exceptionCaught(NettyCodecAdapter.java:165)

        at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:525)

        at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:48)

        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)

        at com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalDecoder.messageReceived(NettyCodecAdapter.java:148)

        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)

        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)

        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)

        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)

        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)

        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)

        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.util.concurrent.RejectedExecutionException: Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-提供者IP+端口, Pool Size: 300 (active: 274, core: 300, max: 300, largest: 300), Task: 145727238 (completed: 145726964), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://提供者IP+端口!

        at com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport.rejectedExecution(AbortPolicyWithReport.java:53)

        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)

        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)

        at com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.caught(AllChannelHandler.java:65)

        ... 19 more

 

当然,如果你没有指定,服务调用线程池默认的size是200,并且使用的是SynchronousQueue队列,请看FixedThreadPool#getExecutor实现:

 

public Executor getExecutor(URL url) {

    String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);

    int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);

    int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

    return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,

          queues == 0 ? new SynchronousQueue<Runnable>() :

             (queues < 0 ? new LinkedBlockingQueue<Runnable>()

                   : new LinkedBlockingQueue<Runnable>(queues)),

          new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));

}

 

 

可以看出,上面的错误日志正式在AbortPolicyWithReport中被输出的,我们看下AbortPolicyWithReport#rejectedExecution的实现:

 

@Override

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

    String msg = String.format("Thread pool is EXHAUSTED!" +

            " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +

            " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!" ,

            threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),

            e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),

            url.getProtocol(), url.getIp(), url.getPort());

    logger.warn(msg);

    throw new RejectedExecutionException(msg);

}

 

可以看出,这里输出的是WRAN级别的日志,并且抛出了RejectedExecutionException异常,那么问题来了,我们业务系统能否检测到这种情况?很遗憾,在这一步的时候,不仅没有走到Invoke的Filter环节,也没有真正开始进行服务调用,所以对服务接口配置的Filter或者AOP中都无法对这种情况进行处理。那我们该如何感知这种情况,一种方式是检测Dubbo输出的日志,第二种方式是消费者可以收到对应的RpcException,因为NettyCodecAdapter$InternalDecoder#exceptionCaught已经对该异常进行了处理,直接输出到了消费者端:

 

@Override

public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {

    ctx.sendUpstream(e);

}

 

消费者方可以用过Filter捕获到该异常,并输出日志:

 

com.alibaba.dubbo.rpc.RpcException: Failed to invoke remote method: doYouLoveMe, provider: dubbo://提供者IP+端口/......, cause: com.alibaba.dubbo.remoting.ExecutionException: class com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler error when process received event .

com.alibaba.dubbo.remoting.ExecutionException: class com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler error when process received event .

        at com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.received(AllChannelHandler.java:58)

        at com.alibaba.dubbo.remoting.exchange.support.header.HeartbeatHandler.received(HeartbeatHandler.java:90)

        at com.alibaba.dubbo.remoting.transport.MultiMessageHandler.received(MultiMessageHandler.java:25)

        at com.alibaba.dubbo.remoting.transport.AbstractPeer.received(AbstractPeer.java:123)

        at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.messageReceived(NettyHandler.java:91)

        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)

        at com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalDecoder.messageReceived(NettyCodecAdapter.java:148)

        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)

        at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)

        at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)

        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:109)

        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312)

        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)

        at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.util.concurrent.RejectedExecutionException: Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-提供者IP+端口, Pool Size: 300 (active: 274, core: 300, max: 300, largest: 300), Task: 145731584 (completed: 145731286), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://提供者IP+端口!

        at com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport.rejectedExecution(AbortPolicyWithReport.java:53)

        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)

        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)

        at com.alibaba.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.received(AllChannelHandler.java:56)

        ... 16 more

 

如果经常出现该问题,说明提供者的处理能力跟不上消费者,最简单的解决办法就是将提供者的服务调用线程池数目调大点,比如:

 

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="500" />

 

这里我们为了保证模块内的主要服务有线程可用(防止次要服务抢占过多服务调用线程),可以对次要服务进行并发限制,例如:

 

<dubbo:protocol threads="500" />

 

<dubbo:service interface="xxxxx" version="1.0.0" ref="xxx1"

    executes="100" >

    <dubbo:method name="findAllPerson" executes="50" />

</dubbo:service>

 

如果服务调用线程池够用,则将直接创建ChannelEventRunnable对象并扔到服务调用线程池中执行。直接将事件交给DecodeHandler来处理,从这里开始调用的Handler如下:

DecodeHandler#received:对message的data进行解码,并交给下一个Handler.

HeaderExchangeHandler#received:这里会记录下读取时间,便于心跳任务检测(参见HeartBeatTask),通过下一个Handler执行完服务调用后往Channel写入应答数据。

DubboProtocol#reply:因为使用的是dubbo协议,所以这里是DubboProtocol,这里将直接通过Invoker来调用服务处理过程。

 

这里先给出DubboProtocol中的相关代码片段:

 

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {

    if (message instanceof Invocation) {

        Invocation inv = (Invocation) message;

        // 根据请求参数来选择Invoker

        Invoker<?> invoker = getInvoker(channel, inv);

               RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());

        // 直接执行服务调用过程

        return invoker.invoke(inv);

    }

 

    throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());

}

 

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{

    int port = channel.getLocalAddress().getPort();

    String path = inv.getAttachments().get(Constants.PATH_KEY);

  

    // 服务调用key,即接口名+版本+提供者端口

    String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

          // 通过服务调用key找到对应的DubboExporter对象,exporterMap中是每个接口对应一个DubboExporter

    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

   

    if (exporter == null)

        throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);

         // 返回invoker

    return exporter.getInvoker();

}

 

代码都容易懂,为了能让用户接入服务调用的之前和之后,Dubbo使用了Filter机制,类似于Servlet中的Filter概念,Filter在消费者和提供者都有。当然,Dubbo里面也有一系列的内定Filter用来执行特定的功能,比如状态检查、异常处理等,这里我们就不讨论Dubbo中Filter执行顺序的问题了。为了大家能清晰的了解一笔请求会经历哪些内定的Filter这里列出提供者在接收到请求后会执行的Filter链路(调用顺序从上到下):

invoker = {ProtocolFilterWrapper$1@3163} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"

 invoker = {RegistryProtocol$InvokerDelegete@3172}

 filter = {EchoFilter@3173}     // 第一个调用的Filter,如果方法名是$echo,则仿照echo协议传什么返回什么

 next = {ProtocolFilterWrapper$1@3174} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"

  invoker = {RegistryProtocol$InvokerDelegete@3172}

  filter = {ClassLoaderFilter@3177}    // 将当前线程的ClassLoader切换成服务调用接口的ClassLoader,服务调用完毕再切换回来

  next = {ProtocolFilterWrapper$1@3178} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"

   invoker = {RegistryProtocol$InvokerDelegete@3172}

   filter = {GenericFilter@3180}     // 通过GenericService来调用的Dubbo服务才会执行里面的逻辑

   next = {ProtocolFilterWrapper$1@3181} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"

    invoker = {RegistryProtocol$InvokerDelegete@3172}

    filter = {ContextFilter@3183}    //   填充上下文RpcContext中的数据,服务调用完后清除

    next = {ProtocolFilterWrapper$1@3184} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"

     invoker = {RegistryProtocol$InvokerDelegete@3172}

     filter = {TraceFilter@3188}     //   telnet的时候才会执行内部逻辑

     next = {ProtocolFilterWrapper$1@3189} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"

      invoker = {RegistryProtocol$InvokerDelegete@3172}

      filter = {MonitorFilter@3192}     //   如果需要使用监控功能,这里会统计并发数和异常数

      next = {ProtocolFilterWrapper$1@3193} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"

       invoker = {RegistryProtocol$InvokerDelegete@3172}

       filter = {TimeoutFilter@3196}        //   统计方法调用耗时,如果超过provider设置的时间,则输出告警日志

       next = {ProtocolFilterWrapper$1@3197} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"

        invoker = {RegistryProtocol$InvokerDelegete@3172}

        filter = {ExceptionFilter@3199}      //   遇到异常Dubbo会如何处理,大家一定得好好看看这个Filter的实现

        next = {ProtocolFilterWrapper$1@3200} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"

         invoker = {RegistryProtocol$InvokerDelegete@3172}

         filter = {TokenFilter@3202}      //   校验token,如果在provider中token设置成true,表明不允许consumer绕过注册中心直接调用provider的服务,注意,token是provider负责生成并注册到注册中心的

         next = {ProtocolFilterWrapper$1@3203} "com.alibaba.dubbo.registry.integration.RegistryProtocol$InvokerDelegete@5a44adf8"

          invoker = {RegistryProtocol$InvokerDelegete@3172}

          filter = {ProviderAFilter@3209}      //   这里是业务方自定义的Filter

          next = {RegistryProtocol$InvokerDelegete@3172}

           invoker = {JavassistProxyFactory$1@3210} "registry://注册中心地址/com.alibaba.dubbo.registry.RegistryService?......"

           InvokerWrapper.invoker = {JavassistProxyFactory$1@3210} "registry://服务提供者地址/com.alibaba.dubbo.registry.RegistryService?......"

            wrapper = {Wrapper0@3215}

            this$0 = {JavassistProxyFactory@3216}

            proxy = {ServiceProviderImpl$$EnhancerByCGLIB$$14338e8a@3217} "com.manzhizhen.study.dubbo.ServiceProviderImpl@747af825"   //  对Spring的服务类做动态代理处理

            type = {Class@418} "interface com.manzhizhen.study.dubbo.ServiceProvider"

            url = {URL@3218} "registry://注册中心地址/com.alibaba.dubbo.registry.RegistryService?......"

           url = {URL@3211} "dubbo://服务提供者地址/com.manzhizhen.study.dubbo.ServiceProvider?......"

 

<!--StartFragment--> <!--EndFragment-->

最后执行的就是Wrapperproxy的调用了,proxy指向的是Spring容器中的服务实例的代理类,这个代理类采用的是和Spring容器配置相关的代理形式(比如这里是CGLib而不是JDK的动态代理),而WrapperDubboJavassist实现的代理类(关于Dubbo代理部分可以参看《Dubbo源代码实现二》),用来调用proxy。大家有兴趣可以看看Wrapper的源码。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics