TransportConnector 在ActiveMq 主要是提供client的连接然后进行消息传递。
1:初始化
在TransportConnector 初始化时会生成TransportServer 主要是根据配置的URL生成ServiceSocket等待客户端的请求
@Override
public void run() {
while (!isStopped()) {
Socket socket = null;
try {
socket = serverSocket.accept();
if (socket != null) {
if (isStopped() || getAcceptListener() == null) {
socket.close();
} else {
if (useQueueForAccept) {
socketQueue.put(socket);
} else {
handleSocket(socket);
}
}
}
} catch (SocketTimeoutException ste) {
// expect this to happen
} catch (Exception e) {
if (!isStopping()) {
onAcceptError(e);
} else if (!isStopped()) {
LOG.warn("run()", e);
onAcceptError(e);
}
}
}
}
当TransportServer 接受客户端的连接后,根据Socket生成Transport,在根据Transport生成TransportConnection。
Transport:主要负责读取socket数据然后交给TransportConnection处理,TransportConnection继承CommandVisitor,这个类是具体处理消息类型,
Response processAddConnection(ConnectionInfo info) throws Exception;
Response processAddSession(SessionInfo info) throws Exception;
Response processAddProducer(ProducerInfo info) throws Exception;
Response processAddConsumer(ConsumerInfo info) throws Exception;
Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) throws Exception;
Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception;
Response processRemoveProducer(ProducerId id) throws Exception;
Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception;
Response processAddDestination(DestinationInfo info) throws Exception;
Response processRemoveDestination(DestinationInfo info) throws Exception;
Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception;
Response processMessage(Message send) throws Exception;
Response processMessageAck(MessageAck ack) throws Exception;
Response processMessagePull(MessagePull pull) throws Exception;
Response processBeginTransaction(TransactionInfo info) throws Exception;
Response processPrepareTransaction(TransactionInfo info) throws Exception;
Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception;
Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception;
Response processRollbackTransaction(TransactionInfo info) throws Exception;
Response processWireFormat(WireFormatInfo info) throws Exception;
Response processKeepAlive(KeepAliveInfo info) throws Exception;
Response processShutdown(ShutdownInfo info) throws Exception;
Response processFlush(FlushCommand command) throws Exception;
Response processBrokerInfo(BrokerInfo info) throws Exception;
Response processRecoverTransactions(TransactionInfo info) throws Exception;
Response processForgetTransaction(TransactionInfo info) throws Exception;
Response processEndTransaction(TransactionInfo info) throws Exception;
Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception;
Response processProducerAck(ProducerAck ack) throws Exception;
Response processMessageDispatch(MessageDispatch dispatch) throws Exception;
Response processControlCommand(ControlCommand command) throws Exception;
Response processConnectionError(ConnectionError error) throws Exception;
Response processConnectionControl(ConnectionControl control) throws Exception;
Response processConsumerControl(ConsumerControl control) throws Exception;
}
3:消息发送,队列消息的发送都是由Queue类来处理,但生产者产生消息就存在待发消息队列里,然后有个线程一直循环这个队列发送消息个消费者
相关推荐
NULL 博文链接:https://shift-alt-ctrl.iteye.com/blog/2014521
ActiveMQ集群解析,详细讲解了详细中间件原理以及使用方法,非常基础的视频!!!
标签:activemq-transport-xstream-1.4.jar.zip,activemq,transport,xstream,1.4,jar.zip包下载,依赖包
标签:activemq-transport-zeroconf-1.3.jar.zip,activemq,transport,zeroconf,1.3,jar.zip包下载,依赖包
标签:activemq-transport-jxta-2.0.jar.zip,activemq,transport,jxta,2.0,jar.zip包下载,依赖包
标签:activemq-transport-xstream-1.3.jar.zip,activemq,transport,xstream,1.3,jar.zip包下载,依赖包
标签:activemq-transport-xstream-1.5.jar.zip,activemq,transport,xstream,1.5,jar.zip包下载,依赖包
标签:activemq-transport-http-1.4.jar.zip,activemq,transport,http,1.4,jar.zip包下载,依赖包
ActiveMQ Failover 故障处理连接方式的参数配置 在MQ连接的URI中配置。
标签:activemq-transport-jxta-1.4.jar.zip,activemq,transport,jxta,1.4,jar.zip包下载,依赖包
标签:activemq-transport-jgroups-2.0.jar.zip,activemq,transport,jgroups,2.0,jar.zip包下载,依赖包
标签:activemq-transport-ember-1.4.jar.zip,activemq,transport,ember,1.4,jar.zip包下载,依赖包
标签:activemq-transport-http-1.3.jar.zip,activemq,transport,http,1.3,jar.zip包下载,依赖包
标签:activemq-transport-ember-1.3.jar.zip,activemq,transport,ember,1.3,jar.zip包下载,依赖包
标签:activemq-transport-xstream-2.1.jar.zip,activemq,transport,xstream,2.1,jar.zip包下载,依赖包
标签:activemq-transport-ember-2.0.jar.zip,activemq,transport,ember,2.0,jar.zip包下载,依赖包
标签:activemq-transport-jgroups-1.2.jar.zip,activemq,transport,jgroups,1.2,jar.zip包下载,依赖包
标签:activemq-transport-jabber-2.0.jar.zip,activemq,transport,jabber,2.0,jar.zip包下载,依赖包
标签:activemq-transport-zeroconf-1.2.jar.zip,activemq,transport,zeroconf,1.2,jar.zip包下载,依赖包
标签:activemq-transport-jabber-2.1.jar.zip,activemq,transport,jabber,2.1,jar.zip包下载,依赖包