`
Donald_Draper
  • 浏览: 950743 次
社区版块
存档分类
最新评论

netty 抽象通道初始化

阅读更多
netty 管道线定义-ChannelPipeline:http://donald-draper.iteye.com/blog/2388453
netty 默认Channel管道线初始化:http://donald-draper.iteye.com/blog/2388613
netty 默认Channel管道线-添加通道处理器:http://donald-draper.iteye.com/blog/2388726
netty 默认Channel管道线-通道处理器移除与替换:http://donald-draper.iteye.com/blog/2388793
netty 默认Channel管道线-Inbound和Outbound事件处理:http://donald-draper.iteye.com/blog/2389148
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
引言:
前一篇文章,我们看了通道接口的定义,先来回顾一下:
通道Channel,关联一个事件循环,即通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器;一个字节buf分配器ByteBufAllocator,用于分配字节buf,还有一些获取通道状态的方式,是否注册到事件循环,通道是否打开,是否可写;另外还要获取通道配置ChannelConfig,通道元数据ChannelMetadata的方法;最重要的是,关联一个Unsafe,用于通道的地址绑定,连接操作以及断开,通道的读写,注册到事件循环以及反注册。
今天我们来看一下通道接口的抽象实现AbstractChannel :
package io.netty.channel;

import io.netty.buffer.ByteBufAllocator;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;

/**
 * A skeletal {@link Channel} implementation.
 */
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);

    private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "flush0()");//flush0方法调用时,通道关闭异常
    private static final ClosedChannelException ENSURE_OPEN_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "ensureOpen(...)");//确保通道打开方法调用时,通道关闭异常
    private static final ClosedChannelException CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "close(...)");//close方法调用时,通道关闭异常
    private static final ClosedChannelException WRITE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "write(...)");//write方法调用时,通道关闭异常
    private static final NotYetConnectedException FLUSH0_NOT_YET_CONNECTED_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new NotYetConnectedException(), AbstractUnsafe.class, "flush0()");//flush0方法调用时,通道还未连接异常

    private final Channel parent;//所属通道
    private final ChannelId id;//通道id
    private final Unsafe unsafe;//硬件底层操作类
    private final DefaultChannelPipeline pipeline;//Channel管道
    private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);//空异步任务
    private final CloseFuture closeFuture = new CloseFuture(this);//异步关闭任务

    private volatile SocketAddress localAddress;//本地socket地址
    private volatile SocketAddress remoteAddress;//远端socket地址
    private volatile EventLoop eventLoop;//通道注册的事件循环
    private volatile boolean registered;//是否注册
    /** Cache for the string representation of this channel */
    private boolean strValActive;
    private String strVal;
    ...
}

从上面可以看出,抽象通道内部关联一个硬件底层操作类Unsafe,个事件循环,即通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器,默认为DefaultChannelPipeline。

我们来简单看一下几个通道关闭异常定义:
 private static final ClosedChannelException FLUSH0_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new ClosedChannelException(), AbstractUnsafe.class, "flush0()");//flush0方法调用时,通道关闭异常

//ThrowableUtil
package io.netty.util.internal;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;

public final class ThrowableUtil {

    private ThrowableUtil() { }

    /**
     * Set the {@link StackTraceElement} for the given {@link Throwable}, using the {@link Class} and method name.
     设置异常的堆栈元素为给定的类和方法名
     */
    public static <T extends Throwable> T unknownStackTrace(T cause, Class<?> clazz, String method) {
        //设置异常的堆栈元素
        cause.setStackTrace(new StackTraceElement[] { new StackTraceElement(clazz.getName(), method, null, -1)});
        return cause;
    }

    /**
     * Gets the stack trace from a Throwable as a String.
     *将Throwable堆栈信息,转换为字符串
     * @param cause the {@link Throwable} to be examined
     * @return the stack trace as generated by {@link Throwable#printStackTrace(java.io.PrintWriter)} method.
     */
    public static String stackTraceToString(Throwable cause) {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        PrintStream pout = new PrintStream(out);
        cause.printStackTrace(pout);
        pout.flush();
        try {
            return new String(out.toByteArray());
        } finally {
            try {
                out.close();
            } catch (IOException ignore) {
                // ignore as should never happen
            }
        }
    }
}

我们来看一下unknownStackTrace方法的设置异常的堆栈元素这一句:
 //设置异常的堆栈元素
 cause.setStackTrace(new StackTraceElement[] { new StackTraceElement(clazz.getName(), method, null, -1)});


//Throwable
/**
 * Sets the stack trace elements that will be returned by
 * {@link #getStackTrace()} and printed by {@link #printStackTrace()}
 * and related methods.
 设置异常的堆栈元素,将会被#getStackTrace方法返回,#printStackTrace打印
 *
 * This method, which is designed for use by RPC frameworks and other
 * advanced systems, allows the client to override the default
 * stack trace that is either generated by {@link #fillInStackTrace()}
 * when a throwable is constructed or deserialized when a throwable is
 * read from a serialization stream.
 此方法,为RPC框架和高级系统而设计,当异常通过#fillInStackTrace构造,
 或当异常从序列化流读取时,反序列化异常堆栈信息时,允许客户端重写默认的堆栈信息。
 *
 * <p>If the stack trace of this {@code Throwable} {@linkplain
 * Throwable#Throwable(String, Throwable, boolean, boolean) is not
 * writable}, calling this method has no effect other than
 * validating its argument.
 如果异常#Throwable返回的结果,即异常不可写,则调用此方法没有任何影响
 *
 * @param   stackTrace the stack trace elements to be associated with
 * this {@code Throwable}.  The specified array is copied by this
 * call; changes in the specified array after the method invocation
 * returns will have no affect on this {@code Throwable}'s stack
 * trace.
 设置异常堆栈元素。堆栈元素数组,是拷贝调用,改变参数,不会影响异常的
 堆栈信息。
 *
 * @throws NullPointerException if {@code stackTrace} is
 *         {@code null} or if any of the elements of
 *         {@code stackTrace} are {@code null}
 *
 * @since  1.4
 */
public void setStackTrace(StackTraceElement[] stackTrace) {
    // Validate argument 验证堆栈元素
    StackTraceElement[] defensiveCopy = stackTrace.clone();
    for (int i = 0; i < defensiveCopy.length; i++) {
        if (defensiveCopy[i] == null)
            throw new NullPointerException("stackTrace[" + i + "]");
    }

    synchronized (this) {
        if (this.stackTrace == null && // Immutable stack
            backtrace == null) // Test for out of protocol state
            return;
	//设置异常堆栈元素
        this.stackTrace = defensiveCopy;
    }
}

//StackTraceElement
public final class StackTraceElement implements java.io.Serializable {
    // Normally initialized by VM (public constructor added in 1.5)
    //异常发生类,方法名,文件名,及行号
    private String declaringClass;
    private String methodName;
    private String fileName;
    private int    lineNumber;
    ...
}

简单看了一下,异常工具类,我们回到抽象通道,来看一下构造:

/**
 * Creates a new instance.
 *
 * @param parent
 *        the parent of this channel. {@code null} if there's no parent.
 */
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    //创建通道id
    id = newId();
    //创建底层操作类unsafe
    unsafe = newUnsafe();
    //新建Channel管道
    pipeline = newChannelPipeline();
}

/**
 * Creates a new instance.
 *
 * @param parent
 *        the parent of this channel. {@code null} if there's no parent.
 */
protected AbstractChannel(Channel parent, ChannelId id) {
    this.parent = parent;
    this.id = id;
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

上面两个构造关键是一下几点

1.
//创建通道id
id = newId();


2.
//创建底层操作类unsafe  
unsafe = newUnsafe();   


3.
//新建Channel管道                
pipeline = newChannelPipeline(); 


我们分别来看这几点:
1.
//创建通道id
id = newId();


/**
 * Returns a new {@link DefaultChannelId} instance. Subclasses may override this method to assign custom
 * {@link ChannelId}s to {@link Channel}s that use the {@link AbstractChannel#AbstractChannel(Channel)} constructor.
 */
protected ChannelId newId() {
    return DefaultChannelId.newInstance();
}

//DefaultChannelId
public final class DefaultChannelId implements ChannelId {

    private static final long serialVersionUID = 3884076183504074063L;

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelId.class);
    private static final byte[] MACHINE_ID;//机器id
    private static final int PROCESS_ID_LEN = 4;//netty进程id长度
    private static final int PROCESS_ID;//netty进程id
    private static final int SEQUENCE_LEN = 4;//序列号长度
    private static final int TIMESTAMP_LEN = 8;//事件戳长度
    private static final int RANDOM_LEN = 4;//随机长度
    private static final AtomicInteger nextSequence = new AtomicInteger();//序列号产生id
    private final byte[] data;//id数据
    private final int hashCode;//哈希code

    private transient String shortValue;//短id
    private transient String longValue;//长id
    @Override
    public String asShortText() {
        String shortValue = this.shortValue;
        if (shortValue == null) {
            this.shortValue = shortValue = ByteBufUtil.hexDump(data, data.length - RANDOM_LEN, RANDOM_LEN);
        }
        return shortValue;
    }

    @Override
    public String asLongText() {
        String longValue = this.longValue;
        if (longValue == null) {
            this.longValue = longValue = newLongValue();
        }
        return longValue;
    }

    private String newLongValue() {
        StringBuilder buf = new StringBuilder(2 * data.length + 5);
        int i = 0;
        i = appendHexDumpField(buf, i, MACHINE_ID.length);
        i = appendHexDumpField(buf, i, PROCESS_ID_LEN);
        i = appendHexDumpField(buf, i, SEQUENCE_LEN);
        i = appendHexDumpField(buf, i, TIMESTAMP_LEN);
        i = appendHexDumpField(buf, i, RANDOM_LEN);
        assert i == data.length;
        return buf.substring(0, buf.length() - 1);
    }
    ...
}

具体id的生成见附。
先来看第3点:
3.
//新建Channel管道                
pipeline = newChannelPipeline(); 


/**
 * Returns a new {@link DefaultChannelPipeline} instance.
 */
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

默认Channel管道线为DefaultChannelPipeline。

再来看:创建底层操作类unsafe 
2.
//创建底层操作类unsafe  
unsafe = newUnsafe();  


 /**
  * Create a new {@link AbstractUnsafe} instance which will be used for the life-time of the {@link Channel}
  //待子类实现
  */
 protected abstract AbstractUnsafe newUnsafe();


在第二点中上述方法返回的是抽象Unsafe,我们来看一下AbstractUnsafe的实现:

抽象通道内部类抽象Unsafe由于篇幅问题,我们单用一篇文章来说:

抽象Unsafe解析:


从上面可以看出通道构造主要是初始化通道所属父通道,通道id,底层操作类Unsafe,Channel管道线程,默认的Channel管道线为DefaultChannelPipeline,底层操作类Unsafe为AbstractUnsafe。


总结:

抽象通道AbstractChannel内部关联一个硬件底层操作类Unsafe,个事件循环,即通道注册的事件循环EventLoop,一个Channel管道ChannelPipeline,用于存放通道处理器,默认为DefaultChannelPipeline。通道构造主要是初始化通道所属父通道,通道id,底层操作类Unsafe,Channel管道线程,默认的Channel管道线为DefaultChannelPipeline,底层操作类Unsafe为AbstractUnsafe。

附:
//DefaultChannelId
package io.netty.channel;

import io.netty.buffer.ByteBufUtil;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.MacAddressUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;

import static io.netty.util.internal.MacAddressUtil.defaultMachineId;
import static io.netty.util.internal.MacAddressUtil.parseMAC;

/**
 * The default {@link ChannelId} implementation.
 */
public final class DefaultChannelId implements ChannelId {

    private static final long serialVersionUID = 3884076183504074063L;

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelId.class);
    private static final byte[] MACHINE_ID;
    private static final int PROCESS_ID_LEN = 4;
    private static final int PROCESS_ID;
    private static final int SEQUENCE_LEN = 4;
    private static final int TIMESTAMP_LEN = 8;
    private static final int RANDOM_LEN = 4;

    private static final AtomicInteger nextSequence = new AtomicInteger();

    /**
     * Returns a new {@link DefaultChannelId} instance.
     */
    public static DefaultChannelId newInstance() {
        return new DefaultChannelId();
    }

    static {
        int processId = -1;
        String customProcessId = SystemPropertyUtil.get("io.netty.processId");
        if (customProcessId != null) {
            try {
                processId = Integer.parseInt(customProcessId);
            } catch (NumberFormatException e) {
                // Malformed input.
            }

            if (processId < 0) {
                processId = -1;
                logger.warn("-Dio.netty.processId: {} (malformed)", customProcessId);
            } else if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.processId: {} (user-set)", processId);
            }
        }

        if (processId < 0) {
            processId = defaultProcessId();
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.processId: {} (auto-detected)", processId);
            }
        }

        PROCESS_ID = processId;

        byte[] machineId = null;
        String customMachineId = SystemPropertyUtil.get("io.netty.machineId");
        if (customMachineId != null) {
            try {
                machineId = parseMAC(customMachineId);
            } catch (Exception e) {
                logger.warn("-Dio.netty.machineId: {} (malformed)", customMachineId, e);
            }
            if (machineId != null) {
                logger.debug("-Dio.netty.machineId: {} (user-set)", customMachineId);
            }
        }

        if (machineId == null) {
            machineId = defaultMachineId();
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.machineId: {} (auto-detected)", MacAddressUtil.formatAddress(machineId));
            }
        }

        MACHINE_ID = machineId;
    }

    private static int defaultProcessId() {
        ClassLoader loader = null;
        String value;
        try {
            loader = PlatformDependent.getClassLoader(DefaultChannelId.class);
            // Invoke java.lang.management.ManagementFactory.getRuntimeMXBean().getName()
            Class<?> mgmtFactoryType = Class.forName("java.lang.management.ManagementFactory", true, loader);
            Class<?> runtimeMxBeanType = Class.forName("java.lang.management.RuntimeMXBean", true, loader);

            Method getRuntimeMXBean = mgmtFactoryType.getMethod("getRuntimeMXBean", EmptyArrays.EMPTY_CLASSES);
            Object bean = getRuntimeMXBean.invoke(null, EmptyArrays.EMPTY_OBJECTS);
            Method getName = runtimeMxBeanType.getMethod("getName", EmptyArrays.EMPTY_CLASSES);
            value = (String) getName.invoke(bean, EmptyArrays.EMPTY_OBJECTS);
        } catch (Throwable t) {
            logger.debug("Could not invoke ManagementFactory.getRuntimeMXBean().getName(); Android?", t);
            try {
                // Invoke android.os.Process.myPid()
                Class<?> processType = Class.forName("android.os.Process", true, loader);
                Method myPid = processType.getMethod("myPid", EmptyArrays.EMPTY_CLASSES);
                value = myPid.invoke(null, EmptyArrays.EMPTY_OBJECTS).toString();
            } catch (Throwable t2) {
                logger.debug("Could not invoke Process.myPid(); not Android?", t2);
                value = "";
            }
        }

        int atIndex = value.indexOf('@');
        if (atIndex >= 0) {
            value = value.substring(0, atIndex);
        }

        int pid;
        try {
            pid = Integer.parseInt(value);
        } catch (NumberFormatException e) {
            // value did not contain an integer.
            pid = -1;
        }

        if (pid < 0) {
            pid = PlatformDependent.threadLocalRandom().nextInt();
            logger.warn("Failed to find the current process ID from '{}'; using a random value: {}",  value, pid);
        }

        return pid;
    }

    private final byte[] data;
    private final int hashCode;

    private transient String shortValue;
    private transient String longValue;

    private DefaultChannelId() {
        data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
        int i = 0;

        // machineId
        System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
        i += MACHINE_ID.length;

        // processId
        i = writeInt(i, PROCESS_ID);

        // sequence
        i = writeInt(i, nextSequence.getAndIncrement());

        // timestamp (kind of)
        i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());

        // random
        int random = PlatformDependent.threadLocalRandom().nextInt();
        i = writeInt(i, random);
        assert i == data.length;

        hashCode = Arrays.hashCode(data);
    }

    private int writeInt(int i, int value) {
        data[i ++] = (byte) (value >>> 24);
        data[i ++] = (byte) (value >>> 16);
        data[i ++] = (byte) (value >>> 8);
        data[i ++] = (byte) value;
        return i;
    }

    private int writeLong(int i, long value) {
        data[i ++] = (byte) (value >>> 56);
        data[i ++] = (byte) (value >>> 48);
        data[i ++] = (byte) (value >>> 40);
        data[i ++] = (byte) (value >>> 32);
        data[i ++] = (byte) (value >>> 24);
        data[i ++] = (byte) (value >>> 16);
        data[i ++] = (byte) (value >>> 8);
        data[i ++] = (byte) value;
        return i;
    }

    @Override
    public String asShortText() {
        String shortValue = this.shortValue;
        if (shortValue == null) {
            this.shortValue = shortValue = ByteBufUtil.hexDump(data, data.length - RANDOM_LEN, RANDOM_LEN);
        }
        return shortValue;
    }

    @Override
    public String asLongText() {
        String longValue = this.longValue;
        if (longValue == null) {
            this.longValue = longValue = newLongValue();
        }
        return longValue;
    }

    private String newLongValue() {
        StringBuilder buf = new StringBuilder(2 * data.length + 5);
        int i = 0;
        i = appendHexDumpField(buf, i, MACHINE_ID.length);
        i = appendHexDumpField(buf, i, PROCESS_ID_LEN);
        i = appendHexDumpField(buf, i, SEQUENCE_LEN);
        i = appendHexDumpField(buf, i, TIMESTAMP_LEN);
        i = appendHexDumpField(buf, i, RANDOM_LEN);
        assert i == data.length;
        return buf.substring(0, buf.length() - 1);
    }

    private int appendHexDumpField(StringBuilder buf, int i, int length) {
        buf.append(ByteBufUtil.hexDump(data, i, length));
        buf.append('-');
        i += length;
        return i;
    }

    @Override
    public int hashCode() {
        return hashCode;
    }

    @Override
    public int compareTo(final ChannelId o) {
        if (this == o) {
            // short circuit
            return 0;
        }
        if (o instanceof DefaultChannelId) {
            // lexicographic comparison
            final byte[] otherData = ((DefaultChannelId) o).data;
            int len1 = data.length;
            int len2 = otherData.length;
            int len = Math.min(len1, len2);

            for (int k = 0; k < len; k++) {
                byte x = data[k];
                byte y = otherData[k];
                if (x != y) {
                    // treat these as unsigned bytes for comparison
                    return (x & 0xff) - (y & 0xff);
                }
            }
            return len1 - len2;
        }

        return asLongText().compareTo(o.asLongText());
    }

    @Override
    public boolean equals(Object obj) {
        return this == obj || (obj instanceof DefaultChannelId && Arrays.equals(data, ((DefaultChannelId) obj).data));
    }

    @Override
    public String toString() {
        return asShortText();
    }
}
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics