ActiveMQ 메모ActiveMQ 메모

Posted at 2010. 9. 10. 16:08 | Posted in OpenSource
Slow Consumer Handlinghttp://activemq.apache.org/slow-consumer-handling.html
  • queue 최대값(the maximum number of matched messages)을 지정해 놓았을 때, 최대값에 다다르면 오래된 것이 discard 된다?
  • prefetch size, pending limit 등 조절하여 메시지 discard가 있는지 확인해 보자. 위의 글로는 메시지가 discard되는 것으로 보이나, 문맥상 consumer에 prefetch될 메시지를 discard한다는 뜻 같기도 함.(영어 실력이 딸려서..)
  • PendingMessageLimitStrategy를 사용할 수 있으며, 다음과 같이 설명하고 있다.
  • You can configure the PendingMessageLimitStrategy implementation class on the destination map so that different regions of your topic namespace can have different strategies for dealing with slow consumers. For example you may want to use this strategy for prices which are very high volume but for orders and trades which are lower volume you might not wish to discard old messages.

    The strategy calculates the maximum number of pending messages to be kept in RAM for a consumer (above its prefetch size). A value of zero means keep no messages around other than the prefetch amount. A value greater than zero will keep up to that amount of messages around, discarding the older messages as new messages come in. A value of -1 disables the discarding of messages.


What is the prefetch limit forhttp://activemq.apache.org/what-is-the-prefetch-limit-for.html

So ActiveMQ uses a prefetch limit on how many messages can be streamed to a consumer at any point in time. Once the prefetch limit is reached, no more messages are dispatched to the consumer until the consumer starts sending back acknowledgements of messages (to indicate that the message has been processed). The actual prefetch limit value can be specified on a per consumer basis.

Its a good idea to have large values of the prefetch limit if you want high performance and if you have high message volumes. If you have very few messages and each message takes a very long time to process you might want to set the prefetch value to 1 so that a consumer is given one message at a time. Specifying a prefetch limit of zero means the consumer will poll for more messages, one at a time, instead of the message being pushed to the consumer.


Pooled Connections and prefetch

Consuming messages from a connection pool can be problematic due to prefetch. Unconsumed prefetched messages are only released when a connection is closed, but with a pooled connection the connection close is deferred (for reuse) till the connection pool closes. This leaves prefetched messages unconsumed till the connection is reused. This feature can present as missing or out-of-sequence messages when there is more than one connection in the pool.
One solution is to use pooled connections for producers and a non-pooled connection for consumers. This might have performance impacts on the consumer side, if multiple threads try to consume messages at a fast rate. Alternatively, reduce the pool size to 1 for consumers. A third alternative is to reduce the prefetchSize to 1 or 0 with the pooled connection factory. When using Spring JMS and MessageDrivenPojo, you cannot use a prefetch of 0, so use 1 instead.



  • The ActiveMQ broker auto-creates the physical resources associated with a destination on demand.
  • broker가 시작할 때 미리 만들어 둘 수도 있다. http://activemq.apache.org/configure-startup-destinations.html
  • 클라이언트에서 destination을 만들었다고 해서 서버에 만들어 지는 것은 아니다. 해당 큐 또는 토픽으로 메시지가 들어오면 만들어진다. 따라서 클라이언트가 destination 객체를 많이 만들었다고 해서 서버의 리소스를 사용하지 않는다.

Ordered Queue 
  • Exclusive Consumer - http://activemq.apache.org/exclusive-consumer.html
    • 요약하면 queue에 하나의 consumer만 붙게하는 것.
    • queue:consumer=1:1 을 broker가 보장해준다.
  • Message Group - http://activemq.apache.org/message-groups.html
    • Parallel Exclusive Consumer 같은 것.
    • 메시지가 어떤 그룹에 속하지는 구분하기 위해 표준 JMS 헤더는 JMSXGroupID 를 사용.
    • 같은 그룹은 같은 consumer에게 가는 것을 보장한다. consumer fail시 다른 consumer가 선택된다.
    • 그룹 ID를 HTTP session ID에 비유하면 broker는 HTTP load balancer가 된다.
    • broker는 메시지가 들어오면 그룹 ID를 검사하고, 해당 그룹에 연결된 consumer가 있는지 확인하여 전달.
    • hash를 사용하므로 많은 수의 그룹을 사용할 수 있다. (Since there could be a massive number of individual message groups we use hash buckets rather than the actual JMSXGroupID string)
    • consumer는 자신이 close되거나 외부에서 그룹을 close할때까지 메시지를 받는다.
    • Getting notified of ownership changes of message groups: 네트워크 오류 등으로 failover처리가 되는 경우 consumer가 변경될 수 있는데, 이 때 consumer는 특정 메시지 그룹의 메시지를 처음 받는 것인지 확인 할 수가 있다. JMSXGroupFirstForConsumer boolean값을 확인하면 된다. consumer가 가진 캐시나 자원을 초기화 하는 작업 등을 할 수 있다.
    • Adding new consumers: consumer들이 모두 연결되지 않은 상태에서는 첫번째 consumer가 모든 그룹의 메시지를 처리하려고 할 것이다. 이 상황을 피하기 위해 destination policy에서 consumersBeforeDispatchStarts 와 timeBeforeDispatchStarts 를 사용할 수 있다. 모든 consumer가 연결될 때까지 dispatching을 지연시킨다.
      ☞ consumer는 메시지 그룹별로 타이머를 걸어 특정 시간 이후로 메시지가 없을 때 그룹 close를 시키면 부드럽게 load balancing이 될 것이다.



Web console - http://host:8161/admin ,  http://host:8161/admin/queues.jsp


읽어볼 거리: 성능 튜닝 - http://www.edwardkim.pe.kr/?p=737
//

ActiveMQ clusteringActiveMQ clustering

Posted at 2010. 3. 2. 21:19 | Posted in OpenSource

We support reliable high performance load balancing of messages on a queue across consumers.
consumer가 죽은 경우, unacknowledged 메시지는 다른 consumer에게로 재전송된다.
빠른 consumer는 되는 대로 가져가고 느린 consumer는 느린 대로 가져간다.

Broker clusters
JMS 컨텍스트에서 가장 기본적인 클러스터링 모델은 JMS broker 집합에 JMS client가 접속하면 그중 하나에 연결되며, broker가 down된 경우 자동으로 다른 broker에 연결된다는 것이다.
activemq는 JMS client에서 failover:// 프로토콜을 사용함으로서 이를 실현한다.
Failover Transport Reference 페이지에서 상세를 볼 수 있다.

Discovery of brokers
클라이언트가 자동으로 감지하여 broker에 연결할 수 있도록 auto-discovery of brokers using static discovery or dynamic discovery 를 지원한다.

Networks of brokers
client/server 또는 hub/spoke 스타일의 topology를 가지고, 많은 클라이언트와 많은 broker들을 가진다고 했을 때, producer만 동작하고 consumer가 없을 경우 메시지는 계속 쌓이기만 할 것이다. 이를 피하기 위해 producer와 연결된 broker로부터 consumer와 연결된 broker로 메시지를 옮기는 store and forward 방식의 broker의 network(Networks of Brokers)를 제공한다. 이는 broker의 네트워크를 만드는 분산 queue와 topic(distributed queues and topics)을 제공한다.
Networks of brokers에서 클라이언트는 어떤 broker라도 연결할 수 있으며 클러스터로 묶여 있으므로 failover도 된다.
또한, Networks of brokers는 필요한 만큼의 broker를 추가할 수도 있어 거대한 수의 클라이언트 네트워크로 확장할 수도 있다.
broker가 방화벽을 통과하여 네트워크 사이를 연결할 수도 있다.
그러나, broker 링으로 구성된 네트워크에서 하나의 broker의 고장은 전체 망을 고장나게 할 수도 있다.

Master Slave
기본적으로 모든 메시지는 master/slave cluster사이에서 복제된다(replicated). 따라서 master가 고장인 경우 잃어버리는 메시지 없이 자동으로 slave로 failover 된다.
master/slave cluster는 논리적으로 하나의 메시지 broker로 동작한다.
master - slaves(stand-by) 구조.


The Failover Transport(http://activemq.apache.org/failover-transport-reference.html)

configuration syntax
failover:(uri1,...,uriN)?transportOptions
or
failover:uri1,...,uriN

기본적으로 uri는 리스트에서 랜덤하게 선택된다.
만약, 첫번째를 master로 쓰고 나머지는 slave(backup)로 사용하려면 randomize 옵션을 끈다.
failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false
※ 옵션 상세는 http://activemq.apache.org/failover-transport-reference.html 참조

Notes
If you use failover, and a broker dies at some point, your sends will block by default. Using TransportListener can help with this regard. It is best to set the Listener directly on the ActiveMQConnectionFactory so that it is in place before any request that may require an network hop.

Pure Master Slave(http://activemq.apache.org/pure-master-slave.html)
shared file system이나 shared database 에 의존하지 않고 fully 복제된 토폴로지를 제공한다.

* slave broker는 master의 모든 메시지를 consume한다.
* master broker는 slave에 메시지를 성공적으로 넘긴 후에 client에 응답을 한다. transaction도 마찬가지.
* 클라이언트는 failover transport를 사용하여 ..

//

ActiveMQ 참고 사이트ActiveMQ 참고 사이트

Posted at 2010. 3. 2. 18:16 | Posted in OpenSource
//

ActiveMQ producer flow controlActiveMQ producer flow control

Posted at 2010. 3. 2. 17:57 | Posted in OpenSource

Producer Flow Control (http://activemq.apache.org/producer-flow-control.html)

Producer가 Async Send할 경우 broker가 flow control 한다.
4.x 버전에서는 TCP flow control로 구현되었으며, 여러 producer가 같은 connection을 공유할 때 데드락등의 문제가 있었다.
5.0 버전에서는 producer 개별로 flow control 가능하게 하여 위의 문제를 해결하였다.
Async Send(http://activemq.apache.org/async-sends.html)시 ProducerWindowSize를 설정해야 한다.
The ProducerWindowSize is the maximum number of bytes of data that a producer will transmit to a broker before waiting for acknowledgment messages from the broker that it has accepted the previously sent messages. In other words, this how you configure the producer flow control window that is used for async sends.
--> producer ack 받지 않고도 계속하여 보낼 수 있는 data 크기(byte)

ActiveMQConnectionFactory connctionFactory = ...
connctionFactory.setProducerWindowSize(1024000);

conf 파일에서 flow control 옵션을 켜고 끌 수 있다.
<destinationPolicy>
      <policyMap>
        <policyEntries>

          <policyEntry topic="FOO.>" producerFlowControl="false" memoryLimit="1mb">
            <dispatchPolicy>
              <strictOrderDispatchPolicy/>
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
              <lastImageSubscriptionRecoveryPolicy/>
            </subscriptionRecoveryPolicy>
          </policyEntry>

        </policyEntries>
      </policyMap>
</destinationPolicy>

flow control의 결과로 느린 consumer로 인해 데이터가 넘치는 것을 방지한다.

Disable Flow Control
모든 가용한 디스크를 사용할 때까지 메시지를 pending하고 dispatching을 계속 하도록 한다.
Message Cursors(http://activemq.apache.org/message-cursors.html) 가 이를 가능하게 한다.

Message Cursors(http://activemq.apache.org/message-cursors.html)
5.0 이전 버전은 dispatch되야 하는 모든 메시지의 참조를 메모리에 보관했다. 이로 인해 pending될 수 있는 메시지 수에 제한이 가해지게 되었다.
일반적인 접근방식은 storage에 메시지를 저장하고 consumer의 요청이 오면 커서를 사용하여 다음 dispatching point를 얻는다.
5.0버전은 hybrid 접근방식을 취해 consumer가 연결된 경우 direct로 전달하고 consumer속도가 떨어지게 되면 cursor를 사용하는 모드로 변경한다.



If a Consumer becomes active after messages are pending from the store for it, or it's slower than the producer, then messages are paged in to the dispatch queue from a pending cursor:


The default message cursor type in ActiveMQ 5.0 is Store based. 여기에 2가지 추가 커서 타입이 있다:
VM Cursor 와 File based Cursor

VM Cursor는 4.X 버전이 동작하는 방식이다. 메시지의 참조를 모두 메모리에 저장한다.
매우 빠르지만, 속도가 느린 consumer인 경우 위험하다.


FileCursor는 VM Corsor에서 파생되었다.broker의 메모리가 한계에 다다르면 그 정보를 파일에 쓴다.


non-persistent 메시지의 경우 cursor로 바로 패스된다.

topic
<destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry topic="org.apache.>" producerFlowControl="false" memoryLimit="1mb">
            <dispatchPolicy>
              <strictOrderDispatchPolicy />
            </dispatchPolicy>
            <deadLetterStrategy>
              <individualDeadLetterStrategy  topicPrefix="Test.DLQ." />
            </deadLetterStrategy>
            <pendingSubscriberPolicy>
            <vmCursor />
            </pendingSubscriberPolicy>
            <pendingDurableSubscriberPolicy>
                <vmDurableCursor/>
            </pendingDurableSubscriberPolicy>
          </policyEntry>
        </policyEntries>
      </policyMap>
</destinationPolicy>
Valid Subscriber types are vmCursor and fileCursor. The default is the store based cursor.
Valid Durable Subscriber cursor types are vmDurableCursor and fileDurableSubscriberCursor. The default is the store based cursor

queue
<destinationPolicy>
      <policyMap>
        <policyEntries>
          <policyEntry queue="org.apache.>">
            <dispatchPolicy>
              <strictOrderDispatchPolicy />
            </dispatchPolicy>
            <deadLetterStrategy>
              <individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
            </deadLetterStrategy>
            <pendingQueuePolicy>
            <vmQueueCursor />
            </pendingQueuePolicy>
          </policyEntry>
        </policyEntries>
      </policyMap>
</destinationPolicy>
Valid Queue cursor types are vmQueueCursor and fileQueueCursor. The default is the store based cursor

System usage
You can also slowdown producers using the <systemUsage> configuration tag.
<systemUsage>
  <systemUsage>
    <memoryUsage>
      <memoryUsage limit="20 mb"/>
    </memoryUsage>
    <storeUsage>
      <storeUsage limit="1 gb" name="foo"/>
    </storeUsage>
    <tempUsage>
      <tempUsage limit="100 mb"/>
    </tempUsage>
  </systemUsage>
</systemUsage>

메시지가 가득찬 경우 default 설정은 send() call을 block시킨다.
이런경우 sendFailIfNoSpace 가 true로 설정하여 send() 이 실패하도록 할 수 있다.
<systemUsage>
<systemUsage sendFailIfNoSpace="true">
   <memoryUsage>
     <memoryUsage limit="20 mb"/>
   </memoryUsage>
</systemUsage>
</systemUsage>

5.3.1 버전에서는 sendFailIfNoSpaceAfterTimeout 속성을 이용하여 일정시간 이후에 send() fail을 발생시킬 수 있다.
<systemUsage>
<systemUsage sendFailIfNoSpaceAfterTimeout="3000">
   <memoryUsage>
     <memoryUsage limit="20 mb"/>
   </memoryUsage>
</systemUsage>
</systemUsage>
millisecond 이다.
//