- 浏览: 954526 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 抽象BootStrap定义:http://donald-draper.iteye.com/blog/2392492
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572
引言:
上面一篇文章我们看了ServerBootstrap,先来回顾一下:
服务端Bootstrap虽然继承与抽象Bootstrap,但他有自己的child通道选项及属性集,事件循环组和通道处理器,这些是用于配置,当Server通道接收客户端的请求,创建与客户端交互的通道。当构造Server引导配置时,如果传递一个事件循环,则Server通道监听器和客户端交互的通道公用一个事件循环组,否则parentGroup事件循环组用于监听器ServerChannel接受连接,childGroup事件循环组用于处理与客户端交互的通道相关事件和IO操作。
Server引导配置绑定socket地址,首先初始化通道,对于Server引导配置,这个通道为NioServerSocketChannel,初始化通道,即初始化Server通道;初始化Server通道,首先将Server引导配置的父类抽象Bootstrap的选项和属性配置给Server通道,然后添加ServerBootstrapAcceptor到Server通道内部的Channel管道内,然后将Server通道注册到事件循环组parentGroup中,然后通过Server通道#bind方法完成实际socket地址;Server引导配置监听器实际为一个Inbound通道处理器,每当有客户端连接请求时,则创建一个与客户端交互的通道,将child通道选项及属性配置给通道,并将通道注册到childGroup事件循环组,然后将通道处理器添加到与客户端交互的通道内部的Channel管道中。
今天我们来看客户端Bootstrap:
由于客户端引导配置Bootstrap,不像Server引导配置需要创建交互通道,所以可以看到客户端引导配置没有在配置
子事件循环。在客户端实例中有这么几句:
我们需要关注的是这一句:
引导配置连接远端socket地址:
从上面来了,连接远端socket地址,实际委托给doResolveAndConnect方法:
下面我们先看一下初始化通道方法,再来看实际连接完成方法doResolveAndConnect0。
//初始化通道
我们来理一下,客户端引导配置的连接操作,首先初始化通道,主要是配置通道的,选型和属性,将通道处理器添加到通道内部的Channel管道中,注册通道到事件循环组,然后委托通道完成实际的连接操作。
再来看其他方法:
总结:
客户端引导配置的连接操作,首先初始化通道,主要是配置通道的,选型和属性,将通道处理器添加到通道内部的Channel管道中,注册通道到事件循环组,然后委托通道完成实际的连接操作。
附:
在客户端引导配置有一个地址解析组配置
我们来简单看一下:
//DefaultAddressResolverGroup,默认地址解析组
//DefaultNameResolver,默认命名解析器
//命名解析器InetNameResolver
先把InetNameResolver的父类SimpleNameResolver看一下,在看socket地址解析器InetSocketAddressResolver
//NameResolver
回到InetNameResolver的asAddressResolver方法的这一句
//socket地址解析器,InetSocketAddressResolver
来看地址解析方法,
//InetSocketAddress
再看另外两个地址解决方法,
这个命名Resolver为DefaultNameResolver,可以回到DefaultNameResolver的定义在上面就不说了,
来看InetSocketAddressResolver的父类AbstractAddressResolver:
//AddressResolver接口定义,这个就不说,看看就行
netty ServerBootStrap解析:http://donald-draper.iteye.com/blog/2392572
引言:
上面一篇文章我们看了ServerBootstrap,先来回顾一下:
服务端Bootstrap虽然继承与抽象Bootstrap,但他有自己的child通道选项及属性集,事件循环组和通道处理器,这些是用于配置,当Server通道接收客户端的请求,创建与客户端交互的通道。当构造Server引导配置时,如果传递一个事件循环,则Server通道监听器和客户端交互的通道公用一个事件循环组,否则parentGroup事件循环组用于监听器ServerChannel接受连接,childGroup事件循环组用于处理与客户端交互的通道相关事件和IO操作。
Server引导配置绑定socket地址,首先初始化通道,对于Server引导配置,这个通道为NioServerSocketChannel,初始化通道,即初始化Server通道;初始化Server通道,首先将Server引导配置的父类抽象Bootstrap的选项和属性配置给Server通道,然后添加ServerBootstrapAcceptor到Server通道内部的Channel管道内,然后将Server通道注册到事件循环组parentGroup中,然后通过Server通道#bind方法完成实际socket地址;Server引导配置监听器实际为一个Inbound通道处理器,每当有客户端连接请求时,则创建一个与客户端交互的通道,将child通道选项及属性配置给通道,并将通道注册到childGroup事件循环组,然后将通道处理器添加到与客户端交互的通道内部的Channel管道中。
今天我们来看客户端Bootstrap:
package io.netty.bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.resolver.AddressResolver; import io.netty.resolver.DefaultAddressResolverGroup; import io.netty.resolver.NameResolver; import io.netty.resolver.AddressResolverGroup; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Map; import java.util.Map.Entry; /** * A {@link Bootstrap} that makes it easy to bootstrap a {@link Channel} to use * for clients. 引导程序Bootstrap使客户端很容易启动一个通道。 * * The {@link #bind()} methods are useful in combination with connectionless transports such as datagram (UDP). * For regular TCP connections, please use the provided {@link #connect()} methods. 绑定方法#bind对无连接的报文通信UDP非常有用。对于Socket连接TCP,可以使用#connect连接方法。 */ public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> { private static final InternalLogger logger = InternalLoggerFactory.getInstance(Bootstrap.class); private static final AddressResolverGroup<?> DEFAULT_RESOLVER = DefaultAddressResolverGroup.INSTANCE; //引导配置 private final BootstrapConfig config = new BootstrapConfig(this); @SuppressWarnings("unchecked") private volatile AddressResolverGroup<SocketAddress> resolver = (AddressResolverGroup<SocketAddress>) DEFAULT_RESOLVER; private volatile SocketAddress remoteAddress;//远端socket地址 public Bootstrap() { } private Bootstrap(Bootstrap bootstrap) { super(bootstrap); resolver = bootstrap.resolver; remoteAddress = bootstrap.remoteAddress; } }
由于客户端引导配置Bootstrap,不像Server引导配置需要创建交互通道,所以可以看到客户端引导配置没有在配置
子事件循环。在客户端实例中有这么几句:
EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Bootstrap,用于配置客户端,一般为Socket通道 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //添加安全套接字处理器和通道处理器到 ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc(), ip, port)); } pipeline.addLast(new LoggingHandler(LogLevel.INFO)); pipeline.addLast(new EchoClientHandler()); } }); ... //连接socket地址 ChannelFuture f = bootstrap.connect(inetSocketAddress).sync(); ... }
我们需要关注的是这一句:
bootstrap.connect(inetSocketAddress).sync();
引导配置连接远端socket地址:
/** * Connect a {@link Channel} to the remote peer. */ public ChannelFuture connect(String inetHost, int inetPort) { return connect(InetSocketAddress.createUnresolved(inetHost, inetPort)); } /** * Connect a {@link Channel} to the remote peer. */ public ChannelFuture connect(InetAddress inetHost, int inetPort) { return connect(new InetSocketAddress(inetHost, inetPort)); } /** * Connect a {@link Channel} to the remote peer. */ public ChannelFuture connect(SocketAddress remoteAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } validate(); return doResolveAndConnect(remoteAddress, config.localAddress()); } /** * Connect a {@link Channel} to the remote peer. */ public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } validate(); return doResolveAndConnect(remoteAddress, localAddress); }
从上面来了,连接远端socket地址,实际委托给doResolveAndConnect方法:
/** * @see #connect() 连接操作 */ private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { //初始化通道,并注册通道到事件循环组 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { if (!regFuture.isSuccess()) { return regFuture; } //初始化成功,则委托给doResolveAndConnect0完成实际连接操作 return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { //如果初始化和注册工作没有完成,添加任务结果监听器,待完成时,更新注册状态,完成实际连接操作 // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; } }
下面我们先看一下初始化通道方法,再来看实际连接完成方法doResolveAndConnect0。
//初始化通道
@Override @SuppressWarnings("unchecked") void init(Channel channel) throws Exception { //添加通道处理器到通道内存的Channel管道中 ChannelPipeline p = channel.pipeline(); p.addLast(config.handler()); //配置通道选项和属性 final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } } }
//实际连接操作 private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { try { final EventLoop eventLoop = channel.eventLoop(); final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop); if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) { // Resolver has no idea about what to do with the specified remote address or it's resolved already. //如果地址解析器不知道如何应对远端地址,或已经解决则连接远端地址,完成实际连接 doConnect(remoteAddress, localAddress, promise); return promise; } final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress); if (resolveFuture.isDone()) { final Throwable resolveFailureCause = resolveFuture.cause(); if (resolveFailureCause != null) { // Failed to resolve immediately channel.close(); promise.setFailure(resolveFailureCause); } else { //远端地址解析完毕,并支持,完成实际连接 // Succeeded to resolve immediately; cached? (or did a blocking lookup) doConnect(resolveFuture.getNow(), localAddress, promise); } return promise; } //如果地址解析任务没有完成,添加监听器,待任务完成时,完成连接操作 // Wait until the name resolution is finished. resolveFuture.addListener(new FutureListener<SocketAddress>() { @Override public void operationComplete(Future<SocketAddress> future) throws Exception { if (future.cause() != null) { channel.close(); promise.setFailure(future.cause()); } else { doConnect(future.getNow(), localAddress, promise); } } }); } catch (Throwable cause) { promise.tryFailure(cause); } return promise; }
//连接远端地址 private static void doConnect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. final Channel channel = connectPromise.channel(); //创建一个任务线程完成实际连接远端地址操作,实际委托给通道的连接方法,任务线程交由通道所在的事件循环去执行 channel.eventLoop().execute(new Runnable() { @Override public void run() { if (localAddress == null) { channel.connect(remoteAddress, connectPromise); } else { //实际委托给通道的连接方法 channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }); }
我们来理一下,客户端引导配置的连接操作,首先初始化通道,主要是配置通道的,选型和属性,将通道处理器添加到通道内部的Channel管道中,注册通道到事件循环组,然后委托通道完成实际的连接操作。
再来看其他方法:
//配置远端socket地址 /** * The {@link SocketAddress} to connect to once the {@link #connect()} method * is called. */ public Bootstrap remoteAddress(SocketAddress remoteAddress) { this.remoteAddress = remoteAddress; return this; } /** * @see #remoteAddress(SocketAddress) */ public Bootstrap remoteAddress(String inetHost, int inetPort) { remoteAddress = InetSocketAddress.createUnresolved(inetHost, inetPort); return this; } /** * @see #remoteAddress(SocketAddress) */ public Bootstrap remoteAddress(InetAddress inetHost, int inetPort) { remoteAddress = new InetSocketAddress(inetHost, inetPort); return this; } /** * Connect a {@link Channel} to the remote peer. 连接远端peer通道 */ public ChannelFuture connect() { validate(); SocketAddress remoteAddress = this.remoteAddress; if (remoteAddress == null) { throw new IllegalStateException("remoteAddress not set"); } return doResolveAndConnect(remoteAddress, config.localAddress()); } //验证引导配置是否有效 @Override public Bootstrap validate() { super.validate(); if (config.handler() == null) { throw new IllegalStateException("handler not set"); } return this; } //克隆配置 @Override @SuppressWarnings("CloneDoesntCallSuperClone") public Bootstrap clone() { return new Bootstrap(this); } /** * Returns a deep clone of this bootstrap which has the identical configuration except that it uses * the given {@link EventLoopGroup}. This method is useful when making multiple {@link Channel}s with similar * settings. */ public Bootstrap clone(EventLoopGroup group) { Bootstrap bs = new Bootstrap(this); bs.group = group; return bs; } //获取引导配置 @Override public final BootstrapConfig config() { return config; } //获取远端socket地址 final SocketAddress remoteAddress() { return remoteAddress; } //获取地址解析组 final AddressResolverGroup<?> resolver() { return resolver; }
总结:
客户端引导配置的连接操作,首先初始化通道,主要是配置通道的,选型和属性,将通道处理器添加到通道内部的Channel管道中,注册通道到事件循环组,然后委托通道完成实际的连接操作。
附:
在客户端引导配置有一个地址解析组配置
private static final AddressResolverGroup<?> DEFAULT_RESOLVER = DefaultAddressResolverGroup.INSTANCE;
我们来简单看一下:
//DefaultAddressResolverGroup,默认地址解析组
package io.netty.resolver; import io.netty.util.concurrent.EventExecutor; import io.netty.util.internal.UnstableApi; import java.net.InetSocketAddress; /** * A {@link AddressResolverGroup} of {@link DefaultNameResolver}s. */ @UnstableApi public final class DefaultAddressResolverGroup extends AddressResolverGroup<InetSocketAddress> { public static final DefaultAddressResolverGroup INSTANCE = new DefaultAddressResolverGroup(); private DefaultAddressResolverGroup() { } //创建地址解析器 @Override protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) throws Exception { return new DefaultNameResolver(executor).asAddressResolver(); } }
//DefaultNameResolver,默认命名解析器
package io.netty.resolver; import io.netty.util.internal.SocketUtils; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Promise; import io.netty.util.internal.UnstableApi; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Arrays; import java.util.List; /** * A {@link InetNameResolver} that resolves using JDK's built-in domain name lookup mechanism. * Note that this resolver performs a blocking name lookup from the caller thread. */ @UnstableApi public class DefaultNameResolver extends InetNameResolver { public DefaultNameResolver(EventExecutor executor) { super(executor); } @Override protected void doResolve(String inetHost, Promise<InetAddress> promise) throws Exception { try { promise.setSuccess(SocketUtils.addressByName(inetHost)); } catch (UnknownHostException e) { promise.setFailure(e); } } @Override protected void doResolveAll(String inetHost, Promise<List<InetAddress>> promise) throws Exception { try { promise.setSuccess(Arrays.asList(SocketUtils.allAddressesByName(inetHost))); } catch (UnknownHostException e) { promise.setFailure(e); } } }
//命名解析器InetNameResolver
package io.netty.resolver; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.internal.UnstableApi; import java.net.InetAddress; import java.net.InetSocketAddress; /** * A skeletal {@link NameResolver} implementation that resolves {@link InetAddress}. */ @UnstableApi public abstract class InetNameResolver extends SimpleNameResolver<InetAddress> { private volatile AddressResolver<InetSocketAddress> addressResolver;//地址解析器 /** * @param executor the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(String)} */ protected InetNameResolver(EventExecutor executor) { super(executor); } /** * Return a {@link AddressResolver} that will use this name resolver underneath. * It's cached internally, so the same instance is always returned. 这个方法时我们要找的 */ public AddressResolver<InetSocketAddress> asAddressResolver() { AddressResolver<InetSocketAddress> result = addressResolver; if (result == null) { synchronized (this) { result = addressResolver; if (result == null) { //如果内部地址解析器为空,则创建一个socket地址解析器 addressResolver = result = new InetSocketAddressResolver(executor(), this); } } } return result; } }
先把InetNameResolver的父类SimpleNameResolver看一下,在看socket地址解析器InetSocketAddressResolver
//SimpleNameResolver package io.netty.resolver; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.internal.UnstableApi; import java.util.List; import static io.netty.util.internal.ObjectUtil.*; /** * A skeletal {@link NameResolver} implementation. */ @UnstableApi public abstract class SimpleNameResolver<T> implements NameResolver<T> { private final EventExecutor executor; /** * @param executor the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(String)} */ protected SimpleNameResolver(EventExecutor executor) { this.executor = checkNotNull(executor, "executor"); } /** * Returns the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(String)}. */ protected EventExecutor executor() { return executor; } @Override public final Future<T> resolve(String inetHost) { final Promise<T> promise = executor().newPromise(); return resolve(inetHost, promise); } @Override public Future<T> resolve(String inetHost, Promise<T> promise) { checkNotNull(promise, "promise"); try { doResolve(inetHost, promise); return promise; } catch (Exception e) { return promise.setFailure(e); } } @Override public final Future<List<T>> resolveAll(String inetHost) { final Promise<List<T>> promise = executor().newPromise(); return resolveAll(inetHost, promise); } @Override public Future<List<T>> resolveAll(String inetHost, Promise<List<T>> promise) { checkNotNull(promise, "promise"); try { doResolveAll(inetHost, promise); return promise; } catch (Exception e) { return promise.setFailure(e); } } /** * Invoked by {@link #resolve(String)} to perform the actual name resolution. */ protected abstract void doResolve(String inetHost, Promise<T> promise) throws Exception; /** * Invoked by {@link #resolveAll(String)} to perform the actual name resolution. */ protected abstract void doResolveAll(String inetHost, Promise<List<T>> promise) throws Exception; @Override public void close() { } }
//NameResolver
package io.netty.resolver; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.internal.UnstableApi; import java.io.Closeable; import java.util.List; /** * Resolves an arbitrary string that represents the name of an endpoint into an address. */ @UnstableApi public interface NameResolver<T> extends Closeable { /** * Resolves the specified name into an address. * * @param inetHost the name to resolve * * @return the address as the result of the resolution */ Future<T> resolve(String inetHost); /** * Resolves the specified name into an address. * * @param inetHost the name to resolve * @param promise the {@link Promise} which will be fulfilled when the name resolution is finished * * @return the address as the result of the resolution */ Future<T> resolve(String inetHost, Promise<T> promise); /** * Resolves the specified host name and port into a list of address. * * @param inetHost the name to resolve * * @return the list of the address as the result of the resolution */ Future<List<T>> resolveAll(String inetHost); /** * Resolves the specified host name and port into a list of address. * * @param inetHost the name to resolve * @param promise the {@link Promise} which will be fulfilled when the name resolution is finished * * @return the list of the address as the result of the resolution */ Future<List<T>> resolveAll(String inetHost, Promise<List<T>> promise); /** * Closes all the resources allocated and used by this resolver. */ @Override void close(); }
回到InetNameResolver的asAddressResolver方法的这一句
//如果内部地址解析器为空,则创建一个socket地址解析器 addressResolver = result = new InetSocketAddressResolver(executor(), this);
//socket地址解析器,InetSocketAddressResolver
package io.netty.resolver; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import io.netty.util.internal.UnstableApi; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; /** * A {@link AbstractAddressResolver} that resolves {@link InetSocketAddress}. */ @UnstableApi public class InetSocketAddressResolver extends AbstractAddressResolver<InetSocketAddress> { final NameResolver<InetAddress> nameResolver;//命名解决器,默认为DefaultNameResolver /** * @param executor the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(java.net.SocketAddress)} * @param nameResolver the {@link NameResolver} used for name resolution */ public InetSocketAddressResolver(EventExecutor executor, NameResolver<InetAddress> nameResolver) { super(executor, InetSocketAddress.class); this.nameResolver = nameResolver; } @Override public void close() { nameResolver.close(); } }
来看地址解析方法,
//地址是否可以解析 @Override protected boolean doIsResolved(InetSocketAddress address) { return !address.isUnresolved(); }
//InetSocketAddress
/** * Checks whether the address has been resolved or not. * * @return <code>true</code> if the hostname couldn't be resolved into * an <code>InetAddress</code>. */ public final boolean isUnresolved() { return holder.isUnresolved(); }
再看另外两个地址解决方法,
@Override protected void doResolve(final InetSocketAddress unresolvedAddress, final Promise<InetSocketAddress> promise) throws Exception { // Note that InetSocketAddress.getHostName() will never incur a reverse lookup here, // because an unresolved address always has a host name. //委托给命令解决器 nameResolver.resolve(unresolvedAddress.getHostName()) .addListener(new FutureListener<InetAddress>() { @Override public void operationComplete(Future<InetAddress> future) throws Exception { if (future.isSuccess()) { promise.setSuccess(new InetSocketAddress(future.getNow(), unresolvedAddress.getPort())); } else { promise.setFailure(future.cause()); } } }); } @Override protected void doResolveAll(final InetSocketAddress unresolvedAddress, final Promise<List<InetSocketAddress>> promise) throws Exception { // Note that InetSocketAddress.getHostName() will never incur a reverse lookup here, // because an unresolved address always has a host name. //委托给命令解决器 nameResolver.resolveAll(unresolvedAddress.getHostName()) .addListener(new FutureListener<List<InetAddress>>() { @Override public void operationComplete(Future<List<InetAddress>> future) throws Exception { if (future.isSuccess()) { List<InetAddress> inetAddresses = future.getNow(); List<InetSocketAddress> socketAddresses = new ArrayList<InetSocketAddress>(inetAddresses.size()); for (InetAddress inetAddress : inetAddresses) { socketAddresses.add(new InetSocketAddress(inetAddress, unresolvedAddress.getPort())); } promise.setSuccess(socketAddresses); } else { promise.setFailure(future.cause()); } } }); }
这个命名Resolver为DefaultNameResolver,可以回到DefaultNameResolver的定义在上面就不说了,
来看InetSocketAddressResolver的父类AbstractAddressResolver:
package io.netty.resolver; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.internal.TypeParameterMatcher; import io.netty.util.internal.UnstableApi; import java.net.SocketAddress; import java.nio.channels.UnsupportedAddressTypeException; import java.util.Collections; import java.util.List; import static io.netty.util.internal.ObjectUtil.checkNotNull; /** * A skeletal {@link AddressResolver} implementation. */ @UnstableApi public abstract class AbstractAddressResolver<T extends SocketAddress> implements AddressResolver<T> { private final EventExecutor executor;//事件执行器 private final TypeParameterMatcher matcher;//socket地址匹配器 /** * @param executor the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(SocketAddress)} */ protected AbstractAddressResolver(EventExecutor executor) { this.executor = checkNotNull(executor, "executor"); matcher = TypeParameterMatcher.find(this, AbstractAddressResolver.class, "T"); } /** * @param executor the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(SocketAddress)} * @param addressType the type of the {@link SocketAddress} supported by this resolver */ protected AbstractAddressResolver(EventExecutor executor, Class<? extends T> addressType) { this.executor = checkNotNull(executor, "executor"); matcher = TypeParameterMatcher.get(addressType); } /** * Returns the {@link EventExecutor} which is used to notify the listeners of the {@link Future} returned * by {@link #resolve(SocketAddress)}. */ protected EventExecutor executor() { return executor; } //命名Resolver是否支持socket地址 @Override public boolean isSupported(SocketAddress address) { return matcher.match(address); } //socket地址是否可以Resolve @Override public final boolean isResolved(SocketAddress address) { if (!isSupported(address)) { throw new UnsupportedAddressTypeException(); } @SuppressWarnings("unchecked") final T castAddress = (T) address; //具体Resolve工作委托给doIsResolved方法 return doIsResolved(castAddress); } /** * Invoked by {@link #isResolved(SocketAddress)} to check if the specified {@code address} has been resolved * already. */ protected abstract boolean doIsResolved(T address); @Override public final Future<T> resolve(SocketAddress address) { if (!isSupported(checkNotNull(address, "address"))) { // Address type not supported by the resolver return executor().newFailedFuture(new UnsupportedAddressTypeException()); } if (isResolved(address)) { // Resolved already; no need to perform a lookup @SuppressWarnings("unchecked") final T cast = (T) address; return executor.newSucceededFuture(cast); } try { @SuppressWarnings("unchecked") final T cast = (T) address; final Promise<T> promise = executor().newPromise(); //具体Resolve工作委托给doIsResolved方法 doResolve(cast, promise); return promise; } catch (Exception e) { return executor().newFailedFuture(e); } } @Override public final Future<T> resolve(SocketAddress address, Promise<T> promise) { checkNotNull(address, "address"); checkNotNull(promise, "promise"); if (!isSupported(address)) { // Address type not supported by the resolver return promise.setFailure(new UnsupportedAddressTypeException()); } if (isResolved(address)) { // Resolved already; no need to perform a lookup @SuppressWarnings("unchecked") final T cast = (T) address; return promise.setSuccess(cast); } try { @SuppressWarnings("unchecked") final T cast = (T) address; //具体Resolve工作委托给doIsResolved方法 doResolve(cast, promise); return promise; } catch (Exception e) { return promise.setFailure(e); } } @Override public final Future<List<T>> resolveAll(SocketAddress address) { if (!isSupported(checkNotNull(address, "address"))) { // Address type not supported by the resolver return executor().newFailedFuture(new UnsupportedAddressTypeException()); } if (isResolved(address)) { // Resolved already; no need to perform a lookup @SuppressWarnings("unchecked") final T cast = (T) address; return executor.newSucceededFuture(Collections.singletonList(cast)); } try { @SuppressWarnings("unchecked") final T cast = (T) address; final Promise<List<T>> promise = executor().newPromise(); //具体Resolve工作委托给doResolveAll方法 doResolveAll(cast, promise); return promise; } catch (Exception e) { return executor().newFailedFuture(e); } } @Override public final Future<List<T>> resolveAll(SocketAddress address, Promise<List<T>> promise) { checkNotNull(address, "address"); checkNotNull(promise, "promise"); if (!isSupported(address)) { // Address type not supported by the resolver return promise.setFailure(new UnsupportedAddressTypeException()); } if (isResolved(address)) { // Resolved already; no need to perform a lookup @SuppressWarnings("unchecked") final T cast = (T) address; return promise.setSuccess(Collections.singletonList(cast)); } try { @SuppressWarnings("unchecked") final T cast = (T) address; //具体Resolve工作委托给doResolveAll方法 doResolveAll(cast, promise); return promise; } catch (Exception e) { return promise.setFailure(e); } } //下面几个方法待子类实现,见InetSocketAddressResolver的定义 /** * Invoked by {@link #resolve(SocketAddress)} to perform the actual name * resolution. */ protected abstract void doResolve(T unresolvedAddress, Promise<T> promise) throws Exception; /** * Invoked by {@link #resolveAll(SocketAddress)} to perform the actual name * resolution. */ protected abstract void doResolveAll(T unresolvedAddress, Promise<List<T>> promise) throws Exception; @Override public void close() { } }
//AddressResolver接口定义,这个就不说,看看就行
package io.netty.resolver; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import io.netty.util.internal.UnstableApi; import java.io.Closeable; import java.net.SocketAddress; import java.nio.channels.UnsupportedAddressTypeException; import java.util.List; /** * Resolves a possibility unresolved {@link SocketAddress}. */ @UnstableApi public interface AddressResolver<T extends SocketAddress> extends Closeable { /** * Returns {@code true} if and only if the specified address is supported by this resolved. */ boolean isSupported(SocketAddress address); /** * Returns {@code true} if and only if the specified address has been resolved. * * @throws UnsupportedAddressTypeException if the specified address is not supported by this resolver */ boolean isResolved(SocketAddress address); /** * Resolves the specified address. If the specified address is resolved already, this method does nothing * but returning the original address. * * @param address the address to resolve * * @return the {@link SocketAddress} as the result of the resolution */ Future<T> resolve(SocketAddress address); /** * Resolves the specified address. If the specified address is resolved already, this method does nothing * but returning the original address. * * @param address the address to resolve * @param promise the {@link Promise} which will be fulfilled when the name resolution is finished * * @return the {@link SocketAddress} as the result of the resolution */ Future<T> resolve(SocketAddress address, Promise<T> promise); /** * Resolves the specified address. If the specified address is resolved already, this method does nothing * but returning the original address. * * @param address the address to resolve * * @return the list of the {@link SocketAddress}es as the result of the resolution */ Future<List<T>> resolveAll(SocketAddress address); /** * Resolves the specified address. If the specified address is resolved already, this method does nothing * but returning the original address. * * @param address the address to resolve * @param promise the {@link Promise} which will be fulfilled when the name resolution is finished * * @return the list of the {@link SocketAddress}es as the result of the resolution */ Future<List<T>> resolveAll(SocketAddress address, Promise<List<T>> promise); /** * Closes all the resources allocated and used by this resolver. */ @Override void close(); }
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1242netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 1982netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2370netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1268netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1260netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1536netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1769netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1334netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2758netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置后续
2017-09-18 08:36 2103netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 1955netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1019netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1815netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1169netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1151netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 896netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1245netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2129netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 998netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1796netty 管道线定义-ChannelPipeline:htt ...
相关推荐
netty服务器解析16进制数据
注:下载前请查看本人博客文章,看是否...里面包含模拟TCP客户端发送报文工具,硬件厂商提供的协议,服务端(springboot+netty)解析报文源码,源码里整合了redis,不需要可自行删除,如有需要客户端代码,可联系我。
mu ke 网 netty源码解析视频,你值得下载
netty源码解析视频教程,深入浅出netty源码视频
netty源码解析PDF,网络编程
[自己做个游戏服务器二] 游戏服务器的基石-Netty全解析,完整项目源码
Netty源码解析第1节 Netty深入剖析·使用Netty作为底层框架的计数·Netty是什么·异步事件驱动框架,用于快速开发高性能服务器和客户端·封装了JD
java Netty源码解析-服务启动过程
netty服务器son解析
java写的808协议解析程序,本人非作者,作者已经不更新了,只供参考
解析1078部标终端推流,并转发至rtmp流服务器.代码有点乱但是本人保证可用. 1078分了2014和2016.该版本是否都支持我已近刚忘记了.但是大同小异对不对~~就是解析的时候注意下就行了
Java Netty版完全符合JT808部标文档的开发规范。直接可以下载使用,不信拉倒。
Netty框架学习: 包含netty的各个面试知识点,以及粘包与半包的解析,以及实例和底层tcp协议的滑动窗口的解析;
基于Netty5的自定义协议,编码解码Demo,自定义名为luck的协议,通过自定义编解码获取客户端发送的数据
Java netty-socketio源码和所需要的jar包,包括依赖包
其中的实现类似如下:// inboud事件默认处理过程// 事件传播到下一个Handler// outboud事件默认处理过程// 事件传播到下一个Handle
最近部分需要做一个一个有关Netty框架的分享会,我就在网上找了好半天也没有一个线程的可用的Netty PPT,所以就动手写了一个,还有好多需要完善的地方,还在修改中....
Netty是Java世界知名的网络应用框架。本系列文章是Netty的源码导读。
小白一个,刚学习完Netty,打算做一个Demo练练手。 感谢以下项目提供的思路,向他们学习! 1, 2, 思路 1,启动: 1.1,需要一个Bootstrap作为客户端的启动类。 1.2,然后添加指定的处理器来解析包,添加管道...
使用netty进行rtsp服务端开发 一个使用netty写的rtsp服务器。 目前支持H264、H265、 AAC格式的流文件上传与存储。 H264、H265、AAC格式流文件的播放