<style type="text/css">
Producer Flow Control生产者流量控制
In ActiveMQ 4.x flow control was implemented using TCP flow control. The underlying network connection of throttled consumers was suspended to enforce flow control limits. This strategy is very efficient but can lead to deadlocks if there are multiple producers
and consumers sharing the same connection.
在ActiveMQ 4.x版本中,流量控制是用TCP流量控制实现的。受节制的消费者底层的网络连接被挂起,以强制进行流量控制限制。这个策略非常高效,但是如果有多个生产者和消费者共享同一个连接的时候,可能会导致死锁。
As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection. By 'flow control' we mean that if the broker detects that the memory limit for the destination, or the temp- or file-store
limits for the broker, have been exceeded, then the flow of messages can be slowed down. The producer will be either blocked until resources are availableorwill receive a JMSException: this behaviour is configurable and described in the section
below on<systemUsage>.
It's worth noting that the default<systemUsage>settings will cause the producer toblockwhen thememoryLimitor<systemUsage>limits are reached: this blocking behaviour is sometimes misinterpreted as a 'hung producer',
when in fact the producer is simply diligently waiting until space is available.
- Messages that are sent synchronously will automatically use per producer flow control; this applies generally to persistent messages which are sent synchronouslyunlessyou enable theuseAsyncSendflag.
- Producers that useAsync Sends- generally speaking, producers of non-persistent messages - don't bother waiting for
any acknowledgement from the broker; so, if a memory limit has been exceeded, you willnotget notfied. If you do want to be aware of broker limits being exceeded, you will need to configure the ProducerWindowSize connection option so that even async
messages are flow controlled per producer.
使用异步发送的生产者 ——一般来说,就是发送非持久化消息的生产者 —— 不需要等候来自代理的任何确认消息;所以,如果内存限制被超过了,你不会被通知。如果你真的想什么时候代理的限制被超过了,你需要配置ProducerWindowSize这一连接选项,这样就算是异步消息也会对每一个生产者进行流量控制。
ActiveMQConnectionFactory connctionFactory = ...
The ProducerWindowSize is the maximum number of bytes of data that a producer will transmit to a broker before waiting for acknowledgment messages from the broker that it has accepted the previously sent messages.
Alternatively, if you're sending non-persisted messages (which are by default sent async), and want to be informed if the queue or topic's memory limit has been breached, then you can simply configure the connection factory to 'alwaysSyncSend'. While this is
going to be slower, it will ensure that your message producer is informed immediately of memory issues.
If you like, you can disable flow control for specific JMS queues and topics on the broker by setting theproducerFlowControlflag to false on the appropriate destination policy in the Broker configuration - e.g.
<policyEntry topic="FOO.>" producerFlowControl="false"/>
seeBroker Configuration.
Note that, since the introduction of the new file cursor in ActiveMQ 5.x, non-persisted messages are shunted into the temporary file store to reduce the amount of memory used for non-persistent messaging. As a result, you may find that a queue's memoryLimit
is never reached, as the cursor doesn't use very much memory. If you really do want to keep all your non-persistent messages in memory, and stop producers when the limit is reached, you should configure the<vmQueueCursor>.
注意,自从ActiveMQ 5.x中引入新的文件游标之后,非持久化消息被分流到了临时文件存储中,以此来减少非持久化消息传送使用的内存总量。结果就是,你可能会发现一个队列的内存限制永远达不到,因为游标不需要使用太多的内存。如果你真的想把所有的非持久化消息存放在内存中,并在达到内存限制的时候停掉生产者,你需要配置<vmQueueCursor>。
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
The fragment above will ensure that all non-persistent queue messages are kept in memory, with each queue having a limit of 1Mb.
How Producer Flow Control works生产者流量控制是如何工作的
If you are sending a persistent message (so that a response of theOpenWireMessage is expected then the broker will send the producer
aProducerAckmessage. This informs the producer that the
previous sending window has been processed, so that it can now send another window. Its kinda like consumer acks but in reverse.
Advantage 优势
So a nice producer might wait for a producer ack before sending more data, to avoid flooding the broker (and forcing the broker to block the entire connection if a slow consumer occurs). To see how this works in source code, check out theActiveMQMessageProducercode.
Though a client can ignore the producer ACKs altogether and the broker should just stall the transport if it has to for slow consumer handling; though this does mean it'll stall the entire connection.
Configure Client-Side Exceptions配置客户端的异常
An alternative to the indefinite blocking of thesend()operation when no space is free on the broker is to instead configure that an exception to be thrown on the client-side. By configuring thesendFailIfNoSpaceproperty totrue,
the broker will cause thesend()operation to fail with ajavax.jms.ResourceAllocationException, which will propagate to the client. Below is an example of this configuration:
应对代理空间不足,而导致不确定的阻塞 send()操作的一种替代方案,就是将其配置成客户端抛出的一个异常。通过将sendFailIfNoSpace属性设置为true,代理将会引起send()方法失败,并抛出javax.jms.ResourceAllocationException异常,传播到客户端。下面是一个配置的示例:
<systemUsage sendFailIfNoSpace="true">
<memoryUsage limit="20 mb"/>
The advantage of this property is that the client can catch thejavax.jms.ResourceAllocationException, wait a bit and retry thesend()operation instead of just hanging indefinitely.
Starting in version 5.3.1 thesendFailIfNoSpaceAfterTimeoutproperty has been added. This property causes thesend()operation to fail with an exception on the client-side, but only after waiting the given amount of time. If space on the broker
is still not freed after the configured amount of time, only then does thesend()operation fail with an exception to the client-side. Below is an example:
<systemUsage sendFailIfNoSpaceAfterTimeout="3000">
<memoryUsage limit="20 mb"/>
The timeout is defined in milliseconds so the example above waits for three seconds before failing thesend()operation with an exception to the client-side. The advantage of this property is that it will block for the configured amount of time instead
of failing immediately or blocking indefinitely. This property offers not only an improvement on the broker-side, but also an improvement for the client so it can catch the exception, wait a bit and retry thesend()operation.
Disabling Flow Control使流量控制无效
A common requirement is to disable flow control so that message dispatching continues until all available disk is used up by pending messages (whether persistent or non persistent messaging is configured). To do this enableMessage
System usage系统占用
You can also slow down producers via some attributes on the<systemUsage>element. Take a look at the following example:
<memoryUsage limit="64 mb" />
<storeUsage limit="100 gb" />
<tempUsage limit="10 gb" />
You can set limits of memory forNON_PERSISTENTmessages, disk storage forPERSITENTmessages and total usage for temporary messages, the broker will use before it slowdown producers.Using the default settings shown above, the broker
will block thesend()call until some messages are consumed and space becomes available on the broker.The default values are shown above, you will probably need to increase these values for your environment.
springboot整合 activeMq 生产者 发送消息 包含队列模式点对点发送消息 以及 主题模式一对多发送消息 这是生产者的demo producer; 需要配合消费者的demo consumer 使用
**ActiveMQ生产者详解** ActiveMQ是Apache组织开发的一个开源消息中间件,它遵循Java Message Service(JMS)规范,提供了高效、可靠的异步通信能力。在分布式系统中,ActiveMQ作为消息代理,允许应用程序之间通过...
在本项目中,我们探讨的是如何使用SpringBoot集成Apache ActiveMQ来构建一个生产者和消费者的应用。SpringBoot以其简洁的配置和快速启动特性,成为现代Java应用开发的首选框架之一,而ActiveMQ则是流行的消息中间件...
3. **生产者(Producer)** 生产者是创建和发送消息到消息队列的角色。在Java中,这通常涉及创建一个`ConnectionFactory`,然后通过这个工厂创建一个`Connection`对象,接着创建一个`Session`,在会话中创建`...
在分布式系统中,ActiveMQ作为消息代理,负责接收、存储和转发消息,从而实现生产者与消费者之间的解耦。 生产者和消费者是JMS中的核心概念。生产者是发送消息的应用,而消费者则是接收这些消息的应用。在ActiveMQ...
Spring整合ActiveMQ是Java消息服务(JMS)在Spring框架中的应用,用于实现生产者与消费者的解耦。在这个案例中,我们将深入探讨如何配置和使用这两个组件,以便于理解它们的工作原理。 首先,ActiveMQ是Apache软件...
- **Amq_Producer.cpp**:这是单线程消息生产者的实现,可能包含创建连接、创建生产者对象、构建消息和发送消息的代码。 - **Amq_Producer_mt.cpp**:扩展了 Amq_Producer.cpp,增加了多线程支持,每个线程独立...
springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。...配合 producer 生产者demo使用。
**消息生产者**(Producer)是发送消息的组件,通常在业务处理完成后,将结果或者事件封装为消息发送到消息队列。在 Spring 中,可以通过 `JmsTemplate` 或者 `MessageProducer` 接口来实现。`springMQProducer.rar`...
- **生产者(Producer)**: 负责创建并发送消息到消息队列。 - **消费者(Consumer)**: 从消息队列中接收并处理消息。 - **主题(Topic)**: 支持发布/订阅模式,一个主题可以有多个订阅者,当生产者发送一条...
- 实现生产者代码,创建一个连接工厂,连接到ActiveMQ服务器,并创建一个Producer发送消息: ```java import org.apache.activemq.ActiveMQConnectionFactory; // 创建连接工厂 ActiveMQConnectionFactory ...
spring-boot-activemq-producer 源码
发送消息涉及到创建一个JMS生产者。首先,你需要创建一个Session,然后根据需求创建一个Destination(队列或主题)。接下来,创建一个MessageProducer,并使用它来发送消息: ```java MessageProducer producer = ...
在这个“ActiveMQ集群及生产者和消费者Java代码”压缩包中,我们可以探讨以下几个关键知识点: 1. **ActiveMQ集群**:ActiveMQ的集群能力允许多个服务器形成一个逻辑单元,提供高可用性和负载均衡。当一个消息代理...
在IT行业中,消息队列(Message Queue)是分布式系统中常用的一种技术,它能有效地解耦各个服务...通过合理配置和使用消息队列,可以提高系统的响应速度,降低服务间的耦合,同时为故障恢复和流量控制提供了有效手段。
标题中的“activemq限流排队”指的是Apache ActiveMQ中的一种流量控制策略,它用于管理消息生产者和消费者之间的消息传递速率,以防止系统过载。ActiveMQ是Apache软件基金会开发的一个开源的消息中间件,它遵循Java...
1. **生产者(Producer)**:生产者是创建和发送消息到消息队列的实体。在这个Demo中,生产者将负责创建消息并将其发送到特定的队列或主题。 2. **消费者(Consumer)**:消费者是从消息队列接收和处理消息的实体。...
- 在Java中创建ActiveMQ生产者涉及到使用JMS API。 - 首先需要创建ConnectionFactory对象,这里使用ActiveMQ提供的ActiveMQConnectionFactory类。 - 然后通过ConnectionFactory创建连接(Connection)并启动。 -...
在这种模式下,生产者(publisher)发布消息到一个主题(topic),而消费者(subscriber)订阅该主题以接收这些消息。与点对点模型不同,发布/订阅模式中的消费者可以是多个,每个订阅者都能接收到所有发布的消息。 ...