`
Donald_Draper
  • 浏览: 950766 次
社区版块
存档分类
最新评论

netty 默认通道配置后续

阅读更多
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
netty 抽象nio消息通道:http://donald-draper.iteye.com/blog/2393364
netty NioServerSocketChannel解析:http://donald-draper.iteye.com/blog/2393443
netty 通道配置接口定义:http://donald-draper.iteye.com/blog/2393484
netty 默认通道配置初始化:http://donald-draper.iteye.com/blog/2393504
引言
前一篇文章我们看了默认通道配置初始化,先来回顾一下:
默认通道配置内部关联一个通道,一个消息大小估算器,默认为DefaultMessageSizeEstimator,,尝试写自旋次数默认为6,写操作失败,默认自动关闭通道,连接超时默认为30000ms,同时拥有一个字节buf 分配器和一个接收字节buf 分配器。通道配置构造,主要是初始化配置关联通道和接收字节buf分配器。如果系统属性io.netty.allocator.type,配置为unpooled,则默认的字节buf分配器为UnpooledByteBufAllocator,否则为PooledByteBufAllocator,对于Android平台,默认为UnpooledByteBufAllocator。默认接收字节buf分配器为AdaptiveRecvByteBufAllocator。接收字节buf分配器,主要是控制下一次接收字节buf的容量,如果当前读取字节数大于消息上一次读取的字节buf容量,则减少下一次接收buf的容量,否则增加下一次接收buf的容量。

今天我们来看默认通道的其他方法:

现将默认通道配置的定义贴出来,以便理解:

/**
 * The default {@link ChannelConfig} implementation.
 */
public class DefaultChannelConfig implements ChannelConfig {
    //消息大小估计器
    private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;

    private static final int DEFAULT_CONNECT_TIMEOUT = 30000; //默认连接超时时间
    //通道是否自动读取属性
    private static final AtomicIntegerFieldUpdater<DefaultChannelConfig> AUTOREAD_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(DefaultChannelConfig.class, "autoRead");
   //通道写buf掩码
    private static final AtomicReferenceFieldUpdater<DefaultChannelConfig, WriteBufferWaterMark> WATERMARK_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(
                    DefaultChannelConfig.class, WriteBufferWaterMark.class, "writeBufferWaterMark");

    protected final Channel channel;//关联通道
   //字节buf分配器
    private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
    //接收字节buf非配器
    private volatile RecvByteBufAllocator rcvBufAllocator;
    //消息大小估计器
    private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
    //连接超时时间
    private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
    private volatile int writeSpinCount = 16;//尝试写自旋次数
    @SuppressWarnings("FieldMayBeFinal")
    private volatile int autoRead = 1;//是否自动读取,
    private volatile boolean autoClose = true;//写操作失败,是否自动关闭通道
    //写buf掩码
    private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
    private volatile boolean pinEventExecutor = true;//是否每个事件分组一个单线程的事件执行器
    
    //构造默认通道配置
    public DefaultChannelConfig(Channel channel) {
        this(channel, new AdaptiveRecvByteBufAllocator());
    }

    protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
        setRecvByteBufAllocator(allocator, channel.metadata());
        this.channel = channel;
    }
}

来看一下写buf的掩码属性的设置:
@Override
public int getWriteBufferHighWaterMark() {
    return writeBufferWaterMark.high();
}

@Override
public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
    if (writeBufferHighWaterMark < 0) {
        throw new IllegalArgumentException(
                "writeBufferHighWaterMark must be >= 0");
    }
    for (;;) {
        WriteBufferWaterMark waterMark = writeBufferWaterMark;
        if (writeBufferHighWaterMark < waterMark.low()) {
            throw new IllegalArgumentException(
                    "writeBufferHighWaterMark cannot be less than " +
                            "writeBufferLowWaterMark (" + waterMark.low() + "): " +
                            writeBufferHighWaterMark);
        }
	//更新写buf高位掩码
        if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
                new WriteBufferWaterMark(waterMark.low(), writeBufferHighWaterMark, false))) {
            return this;
        }
    }
}

@Override
public int getWriteBufferLowWaterMark() {
    return writeBufferWaterMark.low();
}

@Override
public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
    if (writeBufferLowWaterMark < 0) {
        throw new IllegalArgumentException(
                "writeBufferLowWaterMark must be >= 0");
    }
    for (;;) {
        WriteBufferWaterMark waterMark = writeBufferWaterMark;
        if (writeBufferLowWaterMark > waterMark.high()) {
            throw new IllegalArgumentException(
                    "writeBufferLowWaterMark cannot be greater than " +
                            "writeBufferHighWaterMark (" + waterMark.high() + "): " +
                            writeBufferLowWaterMark);
        }
	//更新写buf低位掩码
        if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
                new WriteBufferWaterMark(writeBufferLowWaterMark, waterMark.high(), false))) {
            return this;
        }
    }
}

@Override
public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
    this.writeBufferWaterMark = checkNotNull(writeBufferWaterMark, "writeBufferWaterMark");
    return this;
}

@Override
public WriteBufferWaterMark getWriteBufferWaterMark() {
    return writeBufferWaterMark;
}

在看其他属性的相关方法,都是set与get方法没有什么好说的,简单过一下:
 @Override
 @SuppressWarnings("deprecation")
 public Map<ChannelOption<?>, Object> getOptions() {
     return getOptions(
             null,
             CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT,
             ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK,
             WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR,
             SINGLE_EVENTEXECUTOR_PER_GROUP);
 }

 protected Map<ChannelOption<?>, Object> getOptions(
         Map<ChannelOption<?>, Object> result, ChannelOption<?>... options) {
     if (result == null) {
         result = new IdentityHashMap<ChannelOption<?>, Object>();
     }
     for (ChannelOption<?> o: options) {
         result.put(o, getOption(o));
     }
     return result;
 }

 @SuppressWarnings("unchecked")
 @Override
 public boolean setOptions(Map<ChannelOption<?>, ?> options) {
     if (options == null) {
         throw new NullPointerException("options");
     }

     boolean setAllOptions = true;
     for (Entry<ChannelOption<?>, ?> e: options.entrySet()) {
         if (!setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
             setAllOptions = false;
         }
     }

     return setAllOptions;
 }

 @Override
 @SuppressWarnings({ "unchecked", "deprecation" })
 public <T> T getOption(ChannelOption<T> option) {
     if (option == null) {
         throw new NullPointerException("option");
     }

     if (option == CONNECT_TIMEOUT_MILLIS) {
         return (T) Integer.valueOf(getConnectTimeoutMillis());
     }
     if (option == MAX_MESSAGES_PER_READ) {
         return (T) Integer.valueOf(getMaxMessagesPerRead());
     }
     if (option == WRITE_SPIN_COUNT) {
         return (T) Integer.valueOf(getWriteSpinCount());
     }
     if (option == ALLOCATOR) {
         return (T) getAllocator();
     }
     if (option == RCVBUF_ALLOCATOR) {
         return (T) getRecvByteBufAllocator();
     }
     if (option == AUTO_READ) {
         return (T) Boolean.valueOf(isAutoRead());
     }
     if (option == AUTO_CLOSE) {
         return (T) Boolean.valueOf(isAutoClose());
     }
     if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
         return (T) Integer.valueOf(getWriteBufferHighWaterMark());
     }
     if (option == WRITE_BUFFER_LOW_WATER_MARK) {
         return (T) Integer.valueOf(getWriteBufferLowWaterMark());
     }
     if (option == WRITE_BUFFER_WATER_MARK) {
         return (T) getWriteBufferWaterMark();
     }
     if (option == MESSAGE_SIZE_ESTIMATOR) {
         return (T) getMessageSizeEstimator();
     }
     if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
         return (T) Boolean.valueOf(getPinEventExecutorPerGroup());
     }
     return null;
 }

 @Override
 @SuppressWarnings("deprecation")
 public <T> boolean setOption(ChannelOption<T> option, T value) {
     validate(option, value);

     if (option == CONNECT_TIMEOUT_MILLIS) {
         setConnectTimeoutMillis((Integer) value);
     } else if (option == MAX_MESSAGES_PER_READ) {
         setMaxMessagesPerRead((Integer) value);
     } else if (option == WRITE_SPIN_COUNT) {
         setWriteSpinCount((Integer) value);
     } else if (option == ALLOCATOR) {
         setAllocator((ByteBufAllocator) value);
     } else if (option == RCVBUF_ALLOCATOR) {
         setRecvByteBufAllocator((RecvByteBufAllocator) value);
     } else if (option == AUTO_READ) {
         setAutoRead((Boolean) value);
     } else if (option == AUTO_CLOSE) {
         setAutoClose((Boolean) value);
     } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
         setWriteBufferHighWaterMark((Integer) value);
     } else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
         setWriteBufferLowWaterMark((Integer) value);
     } else if (option == WRITE_BUFFER_WATER_MARK) {
         setWriteBufferWaterMark((WriteBufferWaterMark) value);
     } else if (option == MESSAGE_SIZE_ESTIMATOR) {
         setMessageSizeEstimator((MessageSizeEstimator) value);
     } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
         setPinEventExecutorPerGroup((Boolean) value);
     } else {
         return false;
     }

     return true;
 }

 protected <T> void validate(ChannelOption<T> option, T value) {
     if (option == null) {
         throw new NullPointerException("option");
     }
     option.validate(value);
 }

 @Override
 public int getConnectTimeoutMillis() {
     return connectTimeoutMillis;
 }

 @Override
 public ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
     if (connectTimeoutMillis < 0) {
         throw new IllegalArgumentException(String.format(
                 "connectTimeoutMillis: %d (expected: >= 0)", connectTimeoutMillis));
     }
     this.connectTimeoutMillis = connectTimeoutMillis;
     return this;
 }

 /**
  * {@inheritDoc}
  * <p>
  * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
  * {@link MaxMessagesRecvByteBufAllocator}.
  */
 @Override
 @Deprecated
 public int getMaxMessagesPerRead() {
     try {
         MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
         return allocator.maxMessagesPerRead();
     } catch (ClassCastException e) {
         throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
                 "MaxMessagesRecvByteBufAllocator", e);
     }
 }

 /**
  * {@inheritDoc}
  * <p>
  * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
  * {@link MaxMessagesRecvByteBufAllocator}.
  */
 @Override
 @Deprecated
 public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
     try {
         MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
         allocator.maxMessagesPerRead(maxMessagesPerRead);
         return this;
     } catch (ClassCastException e) {
         throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
                 "MaxMessagesRecvByteBufAllocator", e);
     }
 }

 @Override
 public int getWriteSpinCount() {
     return writeSpinCount;
 }

 @Override
 public ChannelConfig setWriteSpinCount(int writeSpinCount) {
     if (writeSpinCount <= 0) {
         throw new IllegalArgumentException(
                 "writeSpinCount must be a positive integer.");
     }
     this.writeSpinCount = writeSpinCount;
     return this;
 }

 @Override
 public ByteBufAllocator getAllocator() {
     return allocator;
 }

 @Override
 public ChannelConfig setAllocator(ByteBufAllocator allocator) {
     if (allocator == null) {
         throw new NullPointerException("allocator");
     }
     this.allocator = allocator;
     return this;
 }

 @SuppressWarnings("unchecked")
 @Override
 public <T extends RecvByteBufAllocator> T getRecvByteBufAllocator() {
     return (T) rcvBufAllocator;
 }

 @Override
 public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
     rcvBufAllocator = checkNotNull(allocator, "allocator");
     return this;
 }

 /**
  * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
  * @param allocator the allocator to set.
  * @param metadata Used to set the {@link ChannelMetadata#defaultMaxMessagesPerRead()} if {@code allocator}
  * is of type {@link MaxMessagesRecvByteBufAllocator}.
  */
 private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
     if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
         ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
     } else if (allocator == null) {
         throw new NullPointerException("allocator");
     }
     setRecvByteBufAllocator(allocator);
 }
@Override
public MessageSizeEstimator getMessageSizeEstimator() {
    return msgSizeEstimator;
}

@Override
public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
    if (estimator == null) {
        throw new NullPointerException("estimator");
    }
    msgSizeEstimator = estimator;
    return this;
}

private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) {
    this.pinEventExecutor = pinEventExecutor;
    return this;
}

private boolean getPinEventExecutorPerGroup() {
    return pinEventExecutor;
}
 @Override
 public boolean isAutoRead() {
     return autoRead == 1;
 }

 @Override
 public ChannelConfig setAutoRead(boolean autoRead) {
     boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
     if (autoRead && !oldAutoRead) {
         //如果自动读取,则触发通道读操作
         channel.read();
     } else if (!autoRead && oldAutoRead) {
         //关闭通道读取,则清除通道自动读配置
         autoReadCleared();
     }
     return this;
 }
 /**
  * Is called once {@link #setAutoRead(boolean)} is called with {@code false} and {@link #isAutoRead()} was
  * {@code true} before.
  待子类扩展
  */
 protected void autoReadCleared() { }

 @Override
 public boolean isAutoClose() {
     return autoClose;
 }

 @Override
 public ChannelConfig setAutoClose(boolean autoClose) {
     this.autoClose = autoClose;
     return this;
 }

从上面来看,默认通道配置内部主要是配置消息大小估算器,字节buf分配器,接收字节buf分配器等属性。

再来看默认通道的两个分支ServerSocket和Socket通道配置的默认实现:
先来看ServerSocket通道配置,

package io.netty.channel.socket;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.util.NetUtil;

import java.net.ServerSocket;
import java.net.SocketException;
import java.util.Map;

import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_REUSEADDR;

/**
 * The default {@link ServerSocketChannelConfig} implementation.
 */
public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
                                              implements ServerSocketChannelConfig {

    protected final ServerSocket javaSocket;//关联ServerSocket
    private volatile int backlog = NetUtil.SOMAXCONN;//接收的最大连接数

    /**
     * Creates a new instance.
     */
    public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
        super(channel);
        if (javaSocket == null) {
            throw new NullPointerException("javaSocket");
        }
        this.javaSocket = javaSocket;
    }

    @Override
    public Map<ChannelOption<?>, Object> getOptions() {
        return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
    }

    @SuppressWarnings("unchecked")
    @Override
    public <T> T getOption(ChannelOption<T> option) {
        if (option == SO_RCVBUF) {
            return (T) Integer.valueOf(getReceiveBufferSize());
        }
        if (option == SO_REUSEADDR) {
            return (T) Boolean.valueOf(isReuseAddress());
        }
        if (option == SO_BACKLOG) {
            return (T) Integer.valueOf(getBacklog());
        }

        return super.getOption(option);
    }

    @Override
    public <T> boolean setOption(ChannelOption<T> option, T value) {
        validate(option, value);

        if (option == SO_RCVBUF) {
            setReceiveBufferSize((Integer) value);
        } else if (option == SO_REUSEADDR) {
            setReuseAddress((Boolean) value);
        } else if (option == SO_BACKLOG) {
            setBacklog((Integer) value);
        } else {
            return super.setOption(option, value);
        }

        return true;
    }

    @Override
    public boolean isReuseAddress() {
        try {
            return javaSocket.getReuseAddress();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public ServerSocketChannelConfig setReuseAddress(boolean reuseAddress) {
        try {
            javaSocket.setReuseAddress(reuseAddress);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public int getReceiveBufferSize() {
        try {
            return javaSocket.getReceiveBufferSize();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
        try {
            javaSocket.setReceiveBufferSize(receiveBufferSize);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public ServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
        javaSocket.setPerformancePreferences(connectionTime, latency, bandwidth);
        return this;
    }

    @Override
    public int getBacklog() {
        return backlog;
    }

    @Override
    public ServerSocketChannelConfig setBacklog(int backlog) {
        if (backlog < 0) {
            throw new IllegalArgumentException("backlog: " + backlog);
        }
        this.backlog = backlog;
        return this;
    }

    @Override
    public ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
        super.setConnectTimeoutMillis(connectTimeoutMillis);
        return this;
    }

    @Override
    @Deprecated
    public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
        super.setMaxMessagesPerRead(maxMessagesPerRead);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
        super.setWriteSpinCount(writeSpinCount);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
        super.setAllocator(allocator);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
        super.setRecvByteBufAllocator(allocator);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setAutoRead(boolean autoRead) {
        super.setAutoRead(autoRead);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
        super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
        super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
        super.setWriteBufferWaterMark(writeBufferWaterMark);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
        super.setMessageSizeEstimator(estimator);
        return this;
    }
}

我们来看默认的最大连接数backlog:
public final class NetUtil {
	...
	/**
	 * The SOMAXCONN value of the current machine.  If failed to get the value,  {@code 200}  is used as a
	 * default value for Windows or {@code 128} for others.
	 */
	public static final int SOMAXCONN;
	static {
		// As a SecurityManager may prevent reading the somaxconn file we wrap this in a privileged block.
		//
		// See https://github.com/netty/netty/issues/3680
		SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() {
		    @Override
		    public Integer run() {
			// Determine the default somaxconn (server socket backlog) value of the platform.
			// The known defaults:
			// - Windows NT Server 4.0+: 200
			// - Linux and Mac OS X: 128
			int somaxconn = PlatformDependent.isWindows() ? 200 : 128;
			File file = new File("/proc/sys/net/core/somaxconn");
			BufferedReader in = null;
			try {
			    // file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the
			    // try / catch block.
			    // See https://github.com/netty/netty/issues/4936
			    if (file.exists()) {
				in = new BufferedReader(new FileReader(file));
				somaxconn = Integer.parseInt(in.readLine());
				if (logger.isDebugEnabled()) {
				    logger.debug("{}: {}", file, somaxconn);
				}
			    } else {
				// Try to get from sysctl
				Integer tmp = null;
				if (SystemPropertyUtil.getBoolean("io.netty.net.somaxconn.trySysctl", false)) {
				    tmp = sysctlGetInt("kern.ipc.somaxconn");
				    if (tmp == null) {
					tmp = sysctlGetInt("kern.ipc.soacceptqueue");
					if (tmp != null) {
					    somaxconn = tmp;
					}
				    } else {
					somaxconn = tmp;
				    }
				}
			   ...
			     
		});
	    }
}


从上面来看与ServerSocket相关的配置委托给ServerSocket的相关方法,其他委托给父类默认通道配置。

再来看Socket通道配置:
package io.netty.channel.socket;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.util.internal.PlatformDependent;

import java.net.Socket;
import java.net.SocketException;
import java.util.Map;

import static io.netty.channel.ChannelOption.*;

/**
 * The default {@link SocketChannelConfig} implementation.
 */
public class DefaultSocketChannelConfig extends DefaultChannelConfig
                                        implements SocketChannelConfig {

    protected final Socket javaSocket;//关联socket
    private volatile boolean allowHalfClosure;//写失败时,是否关闭通道

    /**
     * Creates a new instance.
     */
    public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
        super(channel);
        if (javaSocket == null) {
            throw new NullPointerException("javaSocket");
        }
        this.javaSocket = javaSocket;

        // Enable TCP_NODELAY by default if possible.
        if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
            try {
                setTcpNoDelay(true);
            } catch (Exception e) {
                // Ignore.
            }
        }
    }

    @Override
    public Map<ChannelOption<?>, Object> getOptions() {
        return getOptions(
                super.getOptions(),
                SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS,
                ALLOW_HALF_CLOSURE);
    }

    @SuppressWarnings("unchecked")
    @Override
    public <T> T getOption(ChannelOption<T> option) {
        if (option == SO_RCVBUF) {
            return (T) Integer.valueOf(getReceiveBufferSize());
        }
        if (option == SO_SNDBUF) {
            return (T) Integer.valueOf(getSendBufferSize());
        }
        if (option == TCP_NODELAY) {
            return (T) Boolean.valueOf(isTcpNoDelay());
        }
        if (option == SO_KEEPALIVE) {
            return (T) Boolean.valueOf(isKeepAlive());
        }
        if (option == SO_REUSEADDR) {
            return (T) Boolean.valueOf(isReuseAddress());
        }
        if (option == SO_LINGER) {
            return (T) Integer.valueOf(getSoLinger());
        }
        if (option == IP_TOS) {
            return (T) Integer.valueOf(getTrafficClass());
        }
        if (option == ALLOW_HALF_CLOSURE) {
            return (T) Boolean.valueOf(isAllowHalfClosure());
        }

        return super.getOption(option);
    }

    @Override
    public <T> boolean setOption(ChannelOption<T> option, T value) {
        validate(option, value);

        if (option == SO_RCVBUF) {
            setReceiveBufferSize((Integer) value);
        } else if (option == SO_SNDBUF) {
            setSendBufferSize((Integer) value);
        } else if (option == TCP_NODELAY) {
            setTcpNoDelay((Boolean) value);
        } else if (option == SO_KEEPALIVE) {
            setKeepAlive((Boolean) value);
        } else if (option == SO_REUSEADDR) {
            setReuseAddress((Boolean) value);
        } else if (option == SO_LINGER) {
            setSoLinger((Integer) value);
        } else if (option == IP_TOS) {
            setTrafficClass((Integer) value);
        } else if (option == ALLOW_HALF_CLOSURE) {
            setAllowHalfClosure((Boolean) value);
        } else {
            return super.setOption(option, value);
        }

        return true;
    }

    @Override
    public int getReceiveBufferSize() {
        try {
            return javaSocket.getReceiveBufferSize();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public int getSendBufferSize() {
        try {
            return javaSocket.getSendBufferSize();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public int getSoLinger() {
        try {
            return javaSocket.getSoLinger();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public int getTrafficClass() {
        try {
            return javaSocket.getTrafficClass();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public boolean isKeepAlive() {
        try {
            return javaSocket.getKeepAlive();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public boolean isReuseAddress() {
        try {
            return javaSocket.getReuseAddress();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public boolean isTcpNoDelay() {
        try {
            return javaSocket.getTcpNoDelay();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public SocketChannelConfig setKeepAlive(boolean keepAlive) {
        try {
            javaSocket.setKeepAlive(keepAlive);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public SocketChannelConfig setPerformancePreferences(
            int connectionTime, int latency, int bandwidth) {
        javaSocket.setPerformancePreferences(connectionTime, latency, bandwidth);
        return this;
    }

    @Override
    public SocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
        try {
            javaSocket.setReceiveBufferSize(receiveBufferSize);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public SocketChannelConfig setReuseAddress(boolean reuseAddress) {
        try {
            javaSocket.setReuseAddress(reuseAddress);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public SocketChannelConfig setSendBufferSize(int sendBufferSize) {
        try {
            javaSocket.setSendBufferSize(sendBufferSize);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public SocketChannelConfig setSoLinger(int soLinger) {
        try {
            if (soLinger < 0) {
                javaSocket.setSoLinger(false, 0);
            } else {
                javaSocket.setSoLinger(true, soLinger);
            }
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
        try {
            javaSocket.setTcpNoDelay(tcpNoDelay);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public SocketChannelConfig setTrafficClass(int trafficClass) {
        try {
            javaSocket.setTrafficClass(trafficClass);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public boolean isAllowHalfClosure() {
        return allowHalfClosure;
    }

    @Override
    public SocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure) {
        this.allowHalfClosure = allowHalfClosure;
        return this;
    }

    @Override
    public SocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
        super.setConnectTimeoutMillis(connectTimeoutMillis);
        return this;
    }

    @Override
    @Deprecated
    public SocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
        super.setMaxMessagesPerRead(maxMessagesPerRead);
        return this;
    }

    @Override
    public SocketChannelConfig setWriteSpinCount(int writeSpinCount) {
        super.setWriteSpinCount(writeSpinCount);
        return this;
    }

    @Override
    public SocketChannelConfig setAllocator(ByteBufAllocator allocator) {
        super.setAllocator(allocator);
        return this;
    }

    @Override
    public SocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
        super.setRecvByteBufAllocator(allocator);
        return this;
    }

    @Override
    public SocketChannelConfig setAutoRead(boolean autoRead) {
         super.setAutoRead(autoRead);
        return this;
    }

    @Override
    public SocketChannelConfig setAutoClose(boolean autoClose) {
        super.setAutoClose(autoClose);
        return this;
    }

    @Override
    public SocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
        super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
        return this;
    }

    @Override
    public SocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
        super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
        return this;
    }

    @Override
    public SocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
        super.setWriteBufferWaterMark(writeBufferWaterMark);
        return this;
    }

    @Override
    public SocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
        super.setMessageSizeEstimator(estimator);
        return this;
    }
}


从上面来看,与Socket相关的配置委托给Socket的相关方法,其他委托给父类默认通道配置。
总结:
默认通道配置内部主要是配置消息大小估算器,字节buf分配器,接收字节buf分配器等属性。默认ServerSocket通道配置,与ServerSocket相关的配置委托给ServerSocket的相关方法,其他委托给父类默认通道配置。默认Socket通道配置,与Socket相关的配置委托给Socket的相关方法,其他委托给父类默认通道配置。
0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics