`

netty源码分析之ReplayingDecoder

阅读更多

           在看了FrameDecoder后,我们接下来再看ReplayingDecoder,我们首先分析下为什么我们已经有了功能非常强大的FrameDecoder了,还行ReplayingDecoder呢,就从下面的例子看起。

         假设消息结构是这样的:

          

VERSION        - 1 byte
TYPE           - 1 byte
PAYLOAD_LENGTH - 4 bytes
PAYLOAD        - N bytes (depends on the value read at PAYLOAD_LENGTH)

    如果我们用FrameDecoder来实现的的话,我们可能要反复读多次这个帧才能被解析出来,然后程序写起来也是比较麻烦的,而且性能也比较低下。

     我们再来看一下消息结构体:

  

public class Envelope {
    private Version version;
    private Type type;
    private byte[] payload;
 
    public Envelope() {
    }
 
    public Envelope(Version version, Type type, byte[] payload) {
        this.version = version;
        this.type = type;
        this.payload = payload;
    }
 
    // getters & setters
}
         我们再来看一下消息编码:

 

        

public class Encoder extends OneToOneEncoder {
    // ...
    public static ChannelBuffer encodeMessage(Envelope message)
            throws IllegalArgumentException {
        // verify that no fields are set to null
 
        // version(1b) + type(1b) + payload length(4b) + payload(nb)
        int size = 6 + message.getPayload().length;
 
        ChannelBuffer buffer = ChannelBuffers.buffer(size);
        buffer.writeByte(message.getVersion().getByteValue());
        buffer.writeByte(message.getType().getByteValue());
        buffer.writeInt(message.getPayload().length);
        buffer.writeBytes(message.getPayload());
 
        return buffer;
    }
 
    @Override
    protected Object encode(ChannelHandlerContext channelHandlerContext,
                            Channel channel, Object msg) throws Exception {
        if (msg instanceof Envelope) {
            return encodeMessage((Envelope) msg);
        } else {
            return msg;
        }
    }
 
    // ..
         这个里面比较重要的是要定义一下解码的状态:

 

         

public enum DecodingState {
    VERSION,
    TYPE,
    PAYLOAD_LENGTH,
    PAYLOAD,
}
         我们再来看一下编码的实现:

 

            

@Override
    protected Object decode(ChannelHandlerContext ctx, Channel channel,
                            ChannelBuffer buffer, DecodingState state)
            throws Exception {
 
        switch (state) {
            case VERSION:
                this.message.setVersion(Version.fromByte(buffer.readByte()));
                checkpoint(DecodingState.TYPE);
            case TYPE:
                this.message.setType(Type.fromByte(buffer.readByte()));
                checkpoint(DecodingState.PAYLOAD_LENGTH);
            case PAYLOAD_LENGTH:
                int size = buffer.readInt();
                if (size <= 0) {
                    throw new Exception("Invalid content size");
                }
                byte[] content = new byte[size];
                this.message.setPayload(content);
                checkpoint(DecodingState.PAYLOAD);
            case PAYLOAD:
                buffer.readBytes(this.message.getPayload(), 0,
                                 this.message.getPayload().length);
 
                try {
                    return this.message;
                } finally {
                    this.reset();
                }
            default:
                throw new Exception("Unknown decoding state: " + state);
        }
    }
 
    private void reset() {
        checkpoint(DecodingState.VERSION);
        this.message = new Envelope();
    }
          这个里面的switch语句我们要特别注意一下,这个里面的checkpoint是当失败后下一次解码的执行点。  不多解释了,这个还是比较好理解的,基本上展现了这个解码器的用法。

 

分享到:
评论
1 楼 hubiao0629 2014-11-20  

相关推荐

Global site tag (gtag.js) - Google Analytics