`
sdyjmc
  • 浏览: 5370 次
最近访客 更多访客>>
社区版块
存档分类
最新评论

Apace Mina Hello world

 
阅读更多

public class TotalCallable implements Callable<Integer> {

private NioSocketConnector connector;
private ConnectFuture cf;

public TotalCallable(String ip, int port, Parameter parameter) {

                connector = new NioSocketConnector();
		// 超时设置
		int connectTimeout = p.getConnectTimeout();
		if (connectTimeout == 0) {
			connectTimeout = Parameter.TIMEOUT_CONNECT;
		}
		connector.setConnectTimeoutMillis(connectTimeout * 1000L);
		DefaultIoFilterChainBuilder chain = connector.getFilterChain();
		chain.addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
		SocketSessionConfig cfg = connector.getSessionConfig();
		cfg.setUseReadOperation(true);
		// 建立连接
		cf = connector.connect(new InetSocketAddress(ip, port));
		// 等待连接创建完成
		cf.awaitUninterruptibly();

	public Integer call() {
		IoSession session = cf.getSession();
		try {
			long s = System.currentTimeMillis();

			int readTimeout = p.getSearchTimeout();
			if (readTimeout == 0) {
				readTimeout = Parameter.TIMEOUT_SEARCH;
			}
			// 发送
			session.write(p).awaitUninterruptibly();
			// 接收
			ReadFuture readFuture = session.read();
			if (readFuture.awaitUninterruptibly(readTimeout, TimeUnit.SECONDS)) {
				Object msg = readFuture.getMessage();
				if (msg instanceof Object[]) {
					Object[] rt = (Object[]) msg;
					if (rt.length > 1 && rt[0] instanceof Integer) {
						Integer total = (Integer) rt[0];
						return total;
					}
				}
			} else {
				// 读超时
			}
			long e = System.currentTimeMillis();
			e = System.currentTimeMillis();
			log.info("完成数据接收:" + (e - s));
		} catch (Exception e) {
			log.error("ip " + ip + " port : " + port);
			log.error(e.getMessage(), e);
			// 重新建立连接
			cf = null;
		} finally {
			// 断开
			session.close(true);
			session.getService().dispose();
			connector.dispose();
		}
		return 0;
	}
}


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics