activemq-client:
org.apache.activemq.transport.tcp.TcpTransport
doConsume()-->transportListener.onCommand(command);
org.apache.activemq.ActiveMQConnection implements Connection, TopicConnection, QueueConnection,TransportListener......
new ActiveMQConnection()-->this.transport.setTransportListener(this);
------------------------------------------------------------------------------------------------------------------------
ActiveMQConnection[TransportListener].onCommand--- get dispatcher by *****ConsumerId***** --->ActiveMQDispatcher.dispatch(md->message)-->
ActiveMQConnectionConsumer and ActiveMQSession -->this.connection.addDispatcher(consumerInfo.getConsumerId(), this);
ActiveMQSession.dispatch-->ActiveMQSessionExecutor-->execute message-->:::::
ActiveMQMessageConsumer.dispatch-->MessageListener.onMessage(message);/*****/
or default ActiveMQSession.alwaysSessionAsync=true; this is Client side !!!!!!
ActiveMQSessionExecutor.wakeup---->taskRunner[PooledTaskRunner].wakeup()----*****--->executor.execute(runTask--ActiveMQSessionExecutor implements Task)
-->iterate()-->ActiveMQMessageConsumer.dispatch-->MessageListener.onMessage(message);/*****/
------------------------------------------------------------------------------------------------------------
But MessageListener.onMessage in the synchronized (a consumer : a unconsumedMessages.getMutex()) !!!??????
***** con: session : consumer--> 1:m:n , ConsumerId-->connectionId + ":" + sessionId + ":" + value;
ActiveMQSession-->createConsumer-->getNextConsumerId()
on ActiveMQMessageConsumer Constructor--> this.session.syncSendPacket(info); the info include the current *****ConsumerId*****,
send it to Broker, the Broker controls the message routine and dispatch path[? con ? session ? consumer]!!!!!!
After the dispatch action-->
ActiveMQMessageConsumer.afterMessageIsConsumed-->session.sendAck(ack[*****consumerId*****]); notify the Broker this is Consumed.
------------------------------------------------------------------------------------------------------------
ActiveMQConnectionFactory::
protected boolean dispatchAsync=true;
protected boolean alwaysSessionAsync=true;
useDedicatedTaskRunner --> per a session per a thread--> ActiveMQConnection.getSessionTaskRunner,TaskRunnerFactory.createTaskRunner,DedicatedTaskRunner
(ActiveMQConnection)connection).setAlwaysSessionAsync(false);/setDispatchAsync(false);
-------------------------------------------------------------------------------
asyncDispatch is Broker side !!!!!!!!!
http://activemq.apache.org/consumer-dispatch-async.html
-----------------------------------------------------------------------------------------------------------------------
org.springframework.jms.listener.DefaultMessageListenerContainer::::::-->
jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
//jmsReciever.setErrorHandler(errorHandler);
//jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());
//jmsListener.setPubSubNoLocal(jmsConfig.isPubSubNoLocal());
jmsListener.setConnectionFactory(jmsConfig.getOrCreateWrappedConnectionFactory());
jmsReciever.setReceiveTimeout(6000);
//jmsListener.setMessageSelector(jmsConfig.getMessageSelector());
//jmsListener.setSubscriptionDurable(jmsConfig.isSubscriptionDurable());
//jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName());
jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted());
//jmsListener.setTransactionManager(jmsConfig.getTransactionManager());
jmsListener.setMessageListener(listenerHandler);
jmsListener.setDestination(destination);
jmsListener.setDestinationName(destinationName);
//jmsReciever.setTaskExecutor(taskExecutor);
//jmsReciever.setBeanName thread_name_prefix
jmsListener.initialize();
jmsListener.start();
--------------------------------------------------------------------------
DefaultMessageListenerContainer::
doInitialize()-->scheduleNewInvoker: concurrentConsumers :AsyncMessageListenerInvoker to run.
AsyncMessageListenerInvoker:session:consumer -->run--> doReceiveAndExecute-->receiveMessage :wait:-->doInvokeListener-->listener.onMessage(message);
/*
*Specify the maximum number of concurrent consumers to create. Default is 1.
*......
*<b>Do not raise the number of concurrent consumers for a topic,
* unless vendor-specific setup measures clearly allow for it.</b>
*......
*/
setConcurrentConsumers
setMaxConcurrentConsumers
/**
* Configure the destination accessor with knowledge of the JMS domain used.
* Default is Point-to-Point (Queues).
* <p>This setting primarily indicates what type of destination to resolve
* if dynamic destinations are enabled.
* @param pubSubDomain "true" for the Publish/Subscribe domain ({@link javax.jms.Topic Topics}),
* "false" for the Point-to-Point domain ({@link javax.jms.Queue Queues})
* ......
*/
setPubSubDomain
-------------------------------------------------------------------------------------------------------------
JMS Session issue:
createSession:
if transacted==true, the AcknowledgeMode forced to Session.SESSION_TRANSACTED
Default is Session.AUTO_ACKNOWLEDGE
注意session非线程安全,建议在事务类型下,每个producer独占一个session以防止多线程环境下提交的混乱,甚至消息莫名丢失。
Reference:
Specification: JSR-343 Java Message Service (JMS) 2.0 ("Specification")
6.2. Sessions
6.2.1. Producer and consumer creation
6.2.5. Threading restrictions on a session
相关推荐
activemq-client-5.9.0.jar;activemq-client-5.9.0.jar
activemq-client-5.10.0.jara
activemq-client-5.8.0.jar
activemq-jms-pool-5.14.4.jar
activemq-all-5.8.0.jar 下载 activemq-all-5.8.0.jar 下载 activemq-all-5.8.0.jar 下载 activemq-all-5.8.0.jar 下载 activemq-all-5.8.0.jar 下载
标签:activemq-jms-pool-5.9.0.jar,activemq,jms,pool,5.9.0,jar包下载,依赖包
标签:activemq-jms-pool-5.9.1.jar,activemq,jms,pool,5.9.1,jar包下载,依赖包
标签:activemq-jms-pool-5.9.1-javadoc.jar,activemq,jms,pool,5.9.1,javadoc,jar包下载,依赖包
标签:activemq-jms-pool-5.10.0.jar,activemq,jms,pool,5.10.0,jar包下载,依赖包
activemq-pool-5.8.0-sources.jar
标签:activemq-jms-pool-5.10.0-javadoc.jar,activemq,jms,pool,5.10.0,javadoc,jar包下载,依赖包
apache-activemq-5.16.0.zip
赠送jar包:activemq-core-5.7.0.jar; 赠送原API文档:activemq-core-5.7.0-javadoc.jar; 赠送源代码:activemq-core-5.7.0-sources.jar; 包含翻译后的API文档:activemq-core-5.7.0-javadoc-API文档-中文...
赠送jar包:activemq-protobuf-1.1.jar; 赠送原API文档:activemq-protobuf-1.1-javadoc.jar; 赠送源代码:activemq-protobuf-1.1-sources.jar; 包含翻译后的API文档:activemq-protobuf-1.1-javadoc-API文档-...
apache-activemq-5.9.0-bin.zip
activemq-all-5.2.0.JAR包,欢迎下载。编写java中间件的时候会用到。这是activemq实现的jms中间件。希望能帮助到你。
标签:activemq-runtime-config-5.10.0.jar,activemq,runtime,config,5.10.0,jar包下载,依赖包
标签:activemq-runtime-config-5.9.1.jar,activemq,runtime,config,5.9.1,jar包下载,依赖包
apache-activemq-5.8.0-bin.zip