`
jean7155
  • 浏览: 61522 次
  • 性别: Icon_minigender_2
  • 来自: 上海
社区版块
存档分类
最新评论

Apache Mina: 自定义codec

阅读更多
本例子根据mina自带的例子:sumup改写。

1. 基本原理:
1) 客户端向服务端发送AddMessage对象时,先根据AddMessageEncoder编码, 当服务端接收到AddMessage后,根据自定义的AddMessageDecode解码数据。

2) 服务端数据解码后,生成回复对象ResultMessage,并对该对象通过ResultMessageEncoder进行编码,并发送到客户端。 客户端接收ResultMessage后,根据ResultMessageDecoder解码,并将数据显示出来。

3) AddMessageEncoder,ResultMessageEncoder,AddMessageDecoder,ResultMessageDecoder自定义编码和解码的方法。

2. 代码:
1) 创建AddMessage和ResultMessage对象
AbstractMessage.java
public abstract class AbstractMessage implements Serializable {
	
    private int sequence;

	public int getSequence() {
		return sequence;
	}

	public void setSequence(int sequence) {
		this.sequence = sequence;
	}
}

AddMessage.java
public class AddMessage extends AbstractMessage {

	private static final long serialVersionUID = -735205238699949292L;
	
	private int value;
	
	public AddMessage(){
		
	}
	
	public int getValue() {
		return value;
	}

	public void setValue(int value) {
		this.value = value;
	}

	@Override
	public String toString() {
		return "AddMessage [value=" + value + ", getSequence()="
				+ getSequence() + "]";
	}

}

ResultMessage.java
public class ResultMessage extends AbstractMessage {

	private static final long serialVersionUID = 7431899532938146290L;
	
	private boolean ok;
	private int value;
	
	public ResultMessage(){
		
	}

	public boolean isOk() {
		return ok;
	}

	public void setOk(boolean ok) {
		this.ok = ok;
	}

	public int getValue() {
		return value;
	}

	public void setValue(int value) {
		this.value = value;
	}

	@Override
	public String toString() {
		return "ResultMessage [ok=" + ok + ", value=" + value
				+ ", getSequence()=" + getSequence() + "]";
	}

}


2) 创建AddMessageEncoder和ResultMessageEncoder
AddMessageEncoder.java
public class AddMessageEncoder<T extends AddMessage> implements MessageEncoder<T> {

	@Override
	public void encode(IoSession session, T message, ProtocolEncoderOutput out)
			throws Exception {
		
		IoBuffer buf = IoBuffer.allocate(16);
	    buf.setAutoExpand(true); // Enable auto-expand for easier encoding

	    // Encode a header
	    buf.putInt(message.getSequence());
	    buf.putInt(message.getValue());

	    buf.flip();
	    out.write(buf);
		
	}

}

ResultMessageEncoder.java
public class ResultMessageEncoder<T extends ResultMessage> implements MessageEncoder<T> {

	@Override
	public void encode(IoSession session, T message, ProtocolEncoderOutput out)
			throws Exception {
		
		IoBuffer buf = IoBuffer.allocate(16);
	    buf.setAutoExpand(true); // Enable auto-expand for easier encoding

	    
	    if(message.isOk()){
	    	buf.putShort((short) Constants.RESULT_OK);
	    	buf.putInt(message.getSequence());
	    	buf.putInt(message.getValue());
	    }else{
	    	
	    	buf.putShort((short)Constants.RESULT_ERROR);
	    	buf.putInt(message.getSequence());
	    	buf.putInt(0);
	    }

	    buf.flip();
	    out.write(buf);
		
	}

}


3) 创建AddMessageDecoder和ResultMessageDecoder
AddMessageDecoder.java
public class AddMessageDecoder implements MessageDecoder {
	

	@Override
	public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
		
		if (in.remaining() < 1) {
            return MessageDecoderResult.NEED_DATA;
        }

        // Return NOT_OK if not matches.
        return MessageDecoderResult.OK;
	}

	@Override
	public MessageDecoderResult decode(IoSession session, IoBuffer in,
			ProtocolDecoderOutput out) throws Exception {
		
		if (in.remaining() < 1) {
			return MessageDecoderResult.NEED_DATA;
	    }
		
		int sequence = in.getInt();
		int value = in.getInt();

        AddMessage m = new AddMessage();
        m.setSequence(sequence);
        m.setValue(value);
        
        out.write(m);
        
        return MessageDecoderResult.OK;

        
	}

	@Override
	public void finishDecode(IoSession session, ProtocolDecoderOutput out)
			throws Exception {
		// TODO Auto-generated method stub

	}

}

ResultMessageDecoder.java
public class ResultMessageDecoder implements MessageDecoder {

	@Override
	public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
		
		if (in.remaining() < 1) {
            return MessageDecoderResult.NEED_DATA;
        }
		
		int code = in.getShort();
		
		if(code==Constants.RESULT_OK){
			return MessageDecoderResult.OK;
		}

        // Return NOT_OK if not matches.
        return MessageDecoderResult.NOT_OK;
	}

	@Override
	public MessageDecoderResult decode(IoSession session, IoBuffer in,
			ProtocolDecoderOutput out) throws Exception {
		
		if (in.remaining() < 1) {
			return MessageDecoderResult.NEED_DATA;
	    }
		
		int code = in.getShort();
		int sequence = in.getInt();
		int value = in.getInt();

        ResultMessage m = new ResultMessage();
        if(code==Constants.RESULT_OK){
        	m.setOk(true);
        	m.setSequence(sequence);
            m.setValue(value);
        }else{
        	m.setOk(false);
        }
        	
        out.write(m);
        
        return MessageDecoderResult.OK;

        
	}

	@Override
	public void finishDecode(IoSession session, ProtocolDecoderOutput out)
			throws Exception {
		// TODO Auto-generated method stub

	}

}


4) 创建CodecFactory,注册AddMessageEncoder, ResultMessageEncoder, AddMessageDecoder, ResultMessageDecoder.
public class DemoProtocolCodecFactory extends DemuxingProtocolCodecFactory {
	
	public DemoProtocolCodecFactory(boolean isServer){
		if(isServer){
			super.addMessageEncoder(ResultMessage.class, ResultMessageEncoder.class);
			super.addMessageDecoder(AddMessageDecoder.class);
		}else{
			super.addMessageEncoder(AddMessage.class, AddMessageEncoder.class);
			super.addMessageDecoder(ResultMessageDecoder.class);
		}
	}

}


5) 创建server端服务和业务处理ServerIoHandler
Server.java
public class Server {
	
	public void init() throws IOException{

		IoAcceptor acceptor = new NioSocketAcceptor();
		
		acceptor.getFilterChain().addLast("logger", new LoggingFilter());
		acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new DemoProtocolCodecFactory(Constants.IS_SERVER)));
		
        acceptor.setHandler(new ServerIoHandler());
		acceptor.getSessionConfig().setReadBufferSize(2048);
		acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);

        acceptor.setDefaultLocalAddress(new InetSocketAddress(Constants.PORT));
        acceptor.bind();// 启动监听  

	}
	

	public static void main(String[] args) throws IOException {
		Server server = new Server();
		server.init();
	}

}

ServerIoHandler.java
public class ServerIoHandler extends IoHandlerAdapter {
	
	private static final String SUM_KEY = "sum";

	@Override
	public void exceptionCaught(IoSession session, Throwable cause)
			throws Exception {
		System.out.println(cause.getMessage());
		cause.printStackTrace();
		session.close(true);
	}

	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {
		AddMessage add = (AddMessage) message;
		
		int sum = ((Integer)session.getAttribute(SUM_KEY)).intValue();
		int value = add.getValue();
		
		long total = (long) sum + value;
		
		if(total>Integer.MAX_VALUE || total<Integer.MIN_VALUE){
			ResultMessage result = new ResultMessage();
			result.setSequence(add.getSequence());
			result.setOk(false);
			session.write(result);
		}else{
			sum = (int) total;
			session.setAttribute(SUM_KEY, sum);
			
			ResultMessage result = new ResultMessage();
			result.setSequence(add.getSequence());
			result.setOk(true);
			result.setValue(sum);
			session.write(result);
		}
		
		//System.out.println("total=" + total);
	}

	@Override
	public void sessionIdle(IoSession session, IdleStatus status)
			throws Exception {
		session.close(true);
	}

	@Override
	public void sessionOpened(IoSession session) throws Exception {
		session.setAttribute(SUM_KEY, new Integer(0));

	}
}


6) 创建Client端连接和业务处理ClientIoHandler
Client.java
public class Client {
	
	public void init() throws InterruptedException{
		NioSocketConnector connector = new NioSocketConnector();

        // Configure the service.
        connector.setConnectTimeoutMillis(Constants.CONNECT_TIMEOUT);
        connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new DemoProtocolCodecFactory(!Constants.IS_SERVER)));
        connector.getFilterChain().addLast("logger", new LoggingFilter());

        connector.setHandler(new ClientIoHandler());

        IoSession session;
        for (;;) {
            try {
                ConnectFuture future = connector.connect(new InetSocketAddress(Constants.HOSTNAME, Constants.PORT));
                future.awaitUninterruptibly();
                session = future.getSession();
                break;
            } catch (RuntimeIoException e) {
                System.err.println("Failed to connect.");
                e.printStackTrace();
                Thread.sleep(5000);
            }
        }

        // wait until the summation is done
        session.getCloseFuture().awaitUninterruptibly();
        
        connector.dispose();
	}

	/**
	 * @param args
	 * @throws InterruptedException 
	 */
	public static void main(String[] args) throws InterruptedException {
		Client client = new Client();
		client.init();

	}

}


ClientIoHandler.java
public class ClientIoHandler extends IoHandlerAdapter {
	
	private List<Integer> values = new ArrayList<Integer>();

	@Override
	public void exceptionCaught(IoSession session, Throwable cause)
			throws Exception {
		session.close(true);
	}

	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {
		
		ResultMessage rm = (ResultMessage) message;
        if (rm.isOk()) {

        	if (rm.getSequence() == values.size() - 1) {
        		System.out.println("the sum is " + rm.getValue());
                session.close(true);
                //finished = true;
            }
        } else {
            System.out.println("Server error, disconnecting...");
            session.close(true);
            //finished = true;
        }
	}

	@Override
	public void sessionOpened(IoSession session) throws Exception {
		
		init();
		
	    for (int i = 0; i < values.size(); i++) {
	    	int _value = ((Integer) values.get(i)).intValue();
	    	
            AddMessage m = new AddMessage();
            m.setSequence(i);
            m.setValue(_value);
            
            session.write(m);
        }
	}

	
	private void init(){
		for(int i=0;i<3;i++){
			int _value = i*100 + 1;
			values.add(new Integer(_value));
		}
	}
}

7. Constants.java
public class Constants {
	public final static int PORT = 9123;
	public static final String HOSTNAME = "localhost";
    public static final long CONNECT_TIMEOUT = 30*1000L; // 30 seconds
	public final static boolean IS_SERVER = true;
    public static final int RESULT_OK = 0;

    public static final int RESULT_ERROR = 1;
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics