ActiveMQ producer flow controlActiveMQ producer flow control
Posted at 2010. 3. 2. 17:57 | Posted in OpenSourceProducer Flow Control (http://activemq.apache.org/producer-flow-control.html)
Producer가 Async Send할 경우 broker가 flow control 한다.
4.x 버전에서는 TCP flow control로 구현되었으며, 여러 producer가 같은 connection을 공유할 때 데드락등의 문제가 있었다.
5.0 버전에서는 producer 개별로 flow control 가능하게 하여 위의 문제를 해결하였다.
Async Send(http://activemq.apache.org/async-sends.html)시 ProducerWindowSize를 설정해야 한다.
The ProducerWindowSize is the maximum number of bytes of data that a producer will transmit to a broker before waiting for acknowledgment messages from the broker that it has accepted the previously sent messages. In other words, this how you configure the producer flow control window that is used for async sends.
--> producer ack 받지 않고도 계속하여 보낼 수 있는 data 크기(byte)
ActiveMQConnectionFactory connctionFactory = ...
connctionFactory.setProducerWindowSize(1024000);
connctionFactory.setProducerWindowSize(1024000);
conf 파일에서 flow control 옵션을 켜고 끌 수 있다.
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="1mb">
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="1mb">
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
flow control의 결과로 느린 consumer로 인해 데이터가 넘치는 것을 방지한다.
Disable Flow Control
모든 가용한 디스크를 사용할 때까지 메시지를 pending하고 dispatching을 계속 하도록 한다.
Message Cursors(http://activemq.apache.org/message-cursors.html) 가 이를 가능하게 한다.
Message Cursors(http://activemq.apache.org/message-cursors.html)
5.0 이전 버전은 dispatch되야 하는 모든 메시지의 참조를 메모리에 보관했다. 이로 인해 pending될 수 있는 메시지 수에 제한이 가해지게 되었다.
일반적인 접근방식은 storage에 메시지를 저장하고 consumer의 요청이 오면 커서를 사용하여 다음 dispatching point를 얻는다.
5.0버전은 hybrid 접근방식을 취해 consumer가 연결된 경우 direct로 전달하고 consumer속도가 떨어지게 되면 cursor를 사용하는 모드로 변경한다.
If a Consumer becomes active after messages are pending from the store for it, or it's slower than the producer, then messages are paged in to the dispatch queue from a pending cursor:
The default message cursor type in ActiveMQ 5.0 is Store based. 여기에 2가지 추가 커서 타입이 있다:
VM Cursor 와 File based Cursor
VM Cursor는 4.X 버전이 동작하는 방식이다. 메시지의 참조를 모두 메모리에 저장한다.
매우 빠르지만, 속도가 느린 consumer인 경우 위험하다.
FileCursor는 VM Corsor에서 파생되었다.broker의 메모리가 한계에 다다르면 그 정보를 파일에 쓴다.
non-persistent 메시지의 경우 cursor로 바로 패스된다.
topic
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="org.apache.>" producerFlowControl="false" memoryLimit="1mb">
<dispatchPolicy>
<strictOrderDispatchPolicy />
</dispatchPolicy>
<deadLetterStrategy>
<individualDeadLetterStrategy topicPrefix="Test.DLQ." />
</deadLetterStrategy>
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
<pendingDurableSubscriberPolicy>
<vmDurableCursor/>
</pendingDurableSubscriberPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
Valid Subscriber types are vmCursor and fileCursor. The default is the store based cursor.<policyMap>
<policyEntries>
<policyEntry topic="org.apache.>" producerFlowControl="false" memoryLimit="1mb">
<dispatchPolicy>
<strictOrderDispatchPolicy />
</dispatchPolicy>
<deadLetterStrategy>
<individualDeadLetterStrategy topicPrefix="Test.DLQ." />
</deadLetterStrategy>
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
<pendingDurableSubscriberPolicy>
<vmDurableCursor/>
</pendingDurableSubscriberPolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
Valid Durable Subscriber cursor types are vmDurableCursor and fileDurableSubscriberCursor. The default is the store based cursor
queue
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="org.apache.>">
<dispatchPolicy>
<strictOrderDispatchPolicy />
</dispatchPolicy>
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
</deadLetterStrategy>
<pendingQueuePolicy>
<vmQueueCursor />
</pendingQueuePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
Valid Queue cursor types are vmQueueCursor and fileQueueCursor. The default is the store based cursor<policyMap>
<policyEntries>
<policyEntry queue="org.apache.>">
<dispatchPolicy>
<strictOrderDispatchPolicy />
</dispatchPolicy>
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
</deadLetterStrategy>
<pendingQueuePolicy>
<vmQueueCursor />
</pendingQueuePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
System usage
You can also slowdown producers using the <systemUsage> configuration tag.
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" name="foo"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" name="foo"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
메시지가 가득찬 경우 default 설정은 send() call을 block시킨다.
이런경우 sendFailIfNoSpace 가 true로 설정하여 send() 이 실패하도록 할 수 있다.
<systemUsage>
<systemUsage sendFailIfNoSpace="true">
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>
<systemUsage sendFailIfNoSpace="true">
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>
5.3.1 버전에서는 sendFailIfNoSpaceAfterTimeout 속성을 이용하여 일정시간 이후에 send() fail을 발생시킬 수 있다.
<systemUsage>
<systemUsage sendFailIfNoSpaceAfterTimeout="3000">
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>
millisecond 이다.<systemUsage sendFailIfNoSpaceAfterTimeout="3000">
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>