`
ahua186186
  • 浏览: 554068 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

summercool-hsf &Netty3.X总结4--客户端同步调用service API

 
阅读更多
1.同步调用:

核心原理:利用JDK的动态代理类创建service代理对象,然后在InvocationHandler中调用channel发送数据, 同时利用信号量同步等待结果返回。

本质我的理解是发送数据时采用“future超时模式”把异步变同步等待数据返回,这里的同步是指每次发送、接收过程的同步,即每次发送都会等待数据响应,消息传输还是异步的。

核心本质:每个请求都持有一个InvokeResult(封装一个信号量),通过invokeFuture.getResult(timeout, TimeUnit.MILLISECONDS);等待超时实现

核心代码:invokeFuture.getResult(timeout, TimeUnit.MILLISECONDS);

相关核心代码:

创建代理:


	/**
	 * @Title: wrapSyncProxy4Service
	 * @Description: 为远程服务创建同步动态代理,返回代理对象
	 * @author 简道
	 * @param serviceInterface
	 *        远程服务接口
	 * @param dispatchStrategy
	 *        分发策略
	 * @return T 返回类型
	 */
	@SuppressWarnings("unchecked")
	private static <T> T wrapSyncProxy4Service(Class<T> serviceInterface, SyncDispatchStrategy dispatchStrategy) {
		if (serviceInterface == null) {
			throw new IllegalArgumentException("serviceInterface can not be null.");
		} else if (!serviceInterface.isInterface()) {
			throw new IllegalArgumentException("serviceInterface is required to be interface.");
		} else if (dispatchStrategy == null) {
			throw new IllegalArgumentException("dispatchStrategy is required to be interface.");
		}

		InvocationHandler requestHandler = new SyncServiceRequestHandler(serviceInterface.getSimpleName(),
				dispatchStrategy);

		// 创建代理
		T serviceProxy = (T) Proxy.newProxyInstance(getClassLoader(serviceInterface), new Class[] { serviceInterface },
				requestHandler);

		return serviceProxy;
	}


同步请求处理:

/**
 * @Title: SyncServiceRequestHandler.java
 * @Package org.summercool.hsf.proxy.strategy
 * @Description: 同步请求处理
 * @author 简道
 * @date 2011-9-30 下午3:10:10
 * @version V1.0
 */
public class SyncServiceRequestHandler implements InvocationHandler {
	String serviceName;
	SyncDispatchStrategy dispatchStrategy;

	public SyncServiceRequestHandler(String serviceName, SyncDispatchStrategy dispatchStrategy) {
		if (serviceName == null) {
			throw new IllegalArgumentException("serviceName can not be null.");
		} else if (dispatchStrategy == null) {
			throw new IllegalArgumentException("dispatchStrategy can not be null.");
		}

		this.serviceName = serviceName;
		this.dispatchStrategy = dispatchStrategy;
	}

	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

		RemoteServiceObject remoteServiceObject = new RemoteServiceObject();
		remoteServiceObject.setMethodName(method.getName());
		remoteServiceObject.setServiceName(serviceName);
		remoteServiceObject.setArgs(args);

		InvokeResult result = dispatchStrategy.dispatch(remoteServiceObject);

		if (result.size() > 0) {
			return result.getFirstValue();
		}

		return ReflectionUtil.getDefaultValue(method.getReturnType());
	}


消息分发:

@Override
	public InvokeResult dispatch(Object message) {
		if (message == null) {
			throw new IllegalArgumentException("Message can not be null.");
		} else if (!service.isAlived()) {
			throw new IllegalStateException("service is not alived.");
		}

		HsfChannel channel = getChannel(service.getGroups());
		Object retObj = write(message, channel);

		// 构建结果
		InvokeResult invokeResult = new InvokeResult();
		invokeResult.put(((HsfChannel) channel).getChannelGroup().getName(), retObj);

		return invokeResult;
	}



HsfChannel同步write:


	public Object writeSync(Object msg) {
		InvokeFuture<?> invokeFuture = writeAsync(msg);

		Object retObj = null;
		boolean invokeTimeout = false;
		Integer timeout = LangUtil.parseInt(service.getOption(HsfOptions.SYNC_INVOKE_TIMEOUT), 60000);
		if (invokeTimeout = (timeout != null && timeout > 0)) {
			// 等待返回,直到Response返回或超时
			retObj = invokeFuture.getResult(timeout, TimeUnit.MILLISECONDS);
		}

		if (!invokeTimeout) {
			// 一直等待,直到Response返回
			retObj = invokeFuture.getResult();
		}

		return retObj;
	}



public V getResult(long timeout, TimeUnit unit) {
		if (!isDone()) {
			try {
				if (!semaphore.tryAcquire(timeout, unit)) {
					setCause(new HsfTimeoutException("time out."));
				}
			} catch (InterruptedException e) {
				throw new HsfRuntimeException(e);
			}
		}
		// check exception
		if (cause != null) {
			if (cause instanceof HsfRemoteServiceException) {
				throw ((HsfRemoteServiceException) cause);
			}
			throw new HsfRuntimeException(cause);
		}
		//
		return this.result;
	}





思考其他同步模型思路:利用JAVA Object的 wait() notify()实现同步模型, 这种思路请求和响应信息必须存储在一个对象上才好实现同步,并且只有一个线程用来轮询发送。

理论和原理:

(1).wait()
等待对象的同步锁,需要获得该对象的同步锁才可以调用这个方法,否则编译可以通过,但运行时会收到一个异常:IllegalMonitorStateException。调用任意对象的 wait() 方法导致该线程阻塞,该线程不可继续执行,并且该对象上的锁被释放。
(2).notify()
唤醒在等待该对象同步锁的线程(只唤醒一个,如果有多个在等待),注意的是在调用此方法的时候,并不能确切的唤醒某一个等待状态的线程,而是由JVM确定唤醒哪个线程,而且不是按优先级。调用任意对象的notify()方法则导致因调用该对象的 wait()方法而阻塞的线程中随机选择的一个解除阻塞(但要等到获得锁后才真正可执行)。

伪代码:

public class RequestSendTask extends Thread
{

    public RequestSendTask(Connector connector1, RequestHolder requestholder)
    {
        super("request_send_task");
        shutdown = false;
        connector = connector1;
        requestHolder = requestholder;
        start();
    }

    public void run()
    {
        while(!shutdown) 
            try
            {
                execute();
            }
            catch(Exception exception)
            {
                logger.error(String.format("send request to [%s:%d] error.", new Object[] {
                    connector.getHost(), Integer.valueOf(connector.getPort())
                }), exception);
                closeAndClear();
            }
    }

    private void execute()
        throws Exception
    {
        do
        {
            if(shutdown)
                break;
            Request request = requestHolder.getRequest();
            if(request != null)
            {
                connector.getSession().write(request);
                requestHolder.resetRequestTime(request.getRequestId());
            }
        } while(true);
    }


 public Response waitForResponse(Request request)
        throws Exception
    {
        RequestWrap requestwrap = new RequestWrap(request);
        waitingObjs.put(Integer.valueOf(request.getRequestId()), requestwrap);
        requestQueue.put(Integer.valueOf(request.getRequestId()));
        if(requestwrap.getStatus() != 5)
            synchronized(requestwrap)
            {
                requestwrap.setStatus((byte)2);
        
  requestwrap.wait();
            }
        if(requestwrap.getStatus() == 3)
            return (Response)requestwrap.getObj();
        String s = (new StringBuilder()).append(request.getBeanName()).append(".").append(request.getMethod()).toString();
        switch(requestwrap.getStatus())
        {
        case 4: // '\004'
            throw new TimeoutException((new StringBuilder()).append("invoke ").append(s).append(" timeout.").toString());

        case 5: // '\005'
            throw new SessionCloseException((new StringBuilder()).append("invoke ").append(s).append(" session close.").toString());
        }
        throw new TransportException((new StringBuilder()).append("invoke ").append(s).append(" error.").toString());
    }

    public void receiveResponse(Response response)
    {
        RequestWrap requestwrap = (RequestWrap)waitingObjs.remove(Integer.valueOf(response.getRequestId()));
        if(requestwrap != null)
        {
            requestwrap.setObj(response);
            synchronized(requestwrap)
            {
                requestwrap.setStatus((byte)3);
        requestwrap.notify();
            }
        } else
        {
            logger.warn((new StringBuilder()).append("remove response, id: ").append(response.getRequestId()).toString());
        }
    }
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics