[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

RE: How to read messages from a queue in parallel

Hi Peter,

If you could use rabbitmq queues instead of SEDA ones I think your problem with messages being lost on restarts would be solved.
As a note, by default Camel doesn't set the persistence header on rabbit messages, it has to be set explicitly (we do it in our abstract route builder):

exchange.getIn().setHeader("rabbitmq.DELIVERY_MODE", 2);

This will allow the messages to survive a broker restart.
If you have rabbit queues instead of SEDA, then it won't matter that the messages get ack'd on the first queue when the consumer is finished with it as it will be published on the subsequent queue in a persistent and durable manner. Then the only other way to lose messages will be when the rabbit cluster loses mirroring between nodes (can happen when the network has a bad day), then all nodes that re-join the cluster will be wiped clean (can be worked around with backups).

Hope that helps, not sure if I would know of any other ways how to achieve what you're looking for.


-----Original Message-----
From: Peter Billen [mailto:peter.billen@xxxxxxxxx] 
Sent: 19 December 2018 13:07
To: users@xxxxxxxxxxxxxxxx
Subject: Re: How to read messages from a queue in parallel

Hi Valid, Camelers,

I got the required parallelism working with dynamic routing and seda queues as following:

        .process(exchange -> exchange.getIn().setHeader("queue",
exchange.getIn().getHeader("group", Integer.class) % 10))
    for (int i = 0; i < 10; ++i) {
        from("seda:q" + i)

- In the first route, I consume a single rabbitMQ with a single consumer.
The messages are forward to 'its' seda queue, calculated as a modulo hash of the group to which the message belongs. Here, theree are 10 seda queues available for processing in parallel.

- In the second routes, I consume each of the 10 seda queues and process.

This seems to work fine. All messages belonging to the same group are executed sequentially, while messages belonging to different groups are executed in parallel. Success!

This has one big problem though. In the first route, the RabbitMQ messages are acknowledged once they are put on the seda queue. This means that messages currently in the seda queue will be lost when the service restarts, as they are not present in the RabbitMQ broker anymore! I would like to ACK the RabbitMQ messages *after execution*, and not after propagation to the seda queue.

Is this somehow possible? I tried to change the first route to:

        .process(exchange -> exchange.getIn().setHeader("queue",
exchange.getIn().getHeader("group", Integer.class) % 10))

But this kills all concurrency, as expected. Is there a way to delay ACK of the RabbitMQ messages to after execution of the message in the seda queue?

As a heads-up: I am trying to implement but then for RabbitMQ, while introducing parallelism (based on the module hash of the group) locally in my service. In case you are interested, you can find my playground project on
It's Docker based, so no external dependencies is needed. The full code related to this thread can be found on


On Tue, Dec 18, 2018 at 2:39 PM Peter Billen <peter.billen@xxxxxxxxx> wrote:

> Hi Valdis,
> Many thanks for your feedback, appreciate it.
> I believe I was looking for the static or dyanamic routing:
>    - Aggregation is not suitable for my use case, as I don't have a 
> deterministic message flow. I also want to process messages in a 
> real-time manner.
>    - Static/dynamic message or routing might be suitable, though the 
> groups should be treated as a dynamic set. In my use case, I have `m` 
> groups and `n` available executor threads. Here, `m` can be any 
> (large) number while `n` is usually small, say 8. I have to try it out 
> with static or dynamic routing, it would be great if I could apply a 
> module hash to map an arbitrary group identifier to an executor thread.
> It's important that the RabbitMQ ACK/NACK happens *after* the 
> successful execution on the executor thread; that is, not when it has 
> been routed by the router. I don't want to risk dropping messages, for 
> example during a service restart.
> I'll experiment and come back with my results.
> Thanks.
> On Mon, Dec 17, 2018 at 4:41 PM Valdis Andersons 
> <valdis.andersons@xxxxxx>
> wrote:
>> Hi Peter,
>> Given you have a header already identifying the message categories, 
>> you could try to look into the Aggregator EIP:
>> GHwtSGwaGgMpw&s=33&u=http%3a%2f%2fcamel%2eapache%2eorg%2faggregator2%
>> 2ehtml
>> It will allow you to group all the messages by the header into 'batches'
>> for each of the groups. Each batch can then be processed by a single 
>> consumer and it would allow you to have multiple consumers taking 
>> care of the parallel processing of different groups. This is very 
>> suitable for periodic batch processing of messages, but requires 
>> finite batch sizes and/or finite time limits.
>> If you have a fixed set of group headers but not a deterministic size 
>> of the messages per group or a non-deterministic time period then a 
>> Message Router might be suitable:
>> GHwtSblZzhbrw&s=33&u=http%3a%2f%2fcamel%2eapache%2eorg%2fmessage-rout
>> er%2ehtml
>> You can directly check for the header in the choice expression and 
>> then route the different group messages to their respective group 
>> queues where you can have single consumers processing them in 
>> sequence in a continuous way. The separate queues would allow to 
>> fulfil the parallel processing per group requirement.
>> There is also a Dynamic Router option, but I personally wouldn't have 
>> any experience with it, but if none of the above suit your needs it 
>> might be something worth looking into:
>> GHwtXC1PGhZrw&s=33&u=http%3a%2f%2fcamel%2eapache%2eorg%2fdynamic-rout
>> er%2ehtml
>> Thanks,
>> Valdis
>> -----Original Message-----
>> From: Peter Billen [mailto:peter.billen@xxxxxxxxx]
>> Sent: 17 December 2018 11:08
>> To: users@xxxxxxxxxxxxxxxx
>> Subject: How to read messages from a queue in parallel
>> Hi all,
>> I am reading from a RabbitMQ queue as following:
>> from("rabbitmq://localhost/?queue=camel&autoAck=false&concurrentConsu
>> mers=1&
>> threadPoolSize=1&prefetchEnabled=true&prefetchCount=50")
>> Some remarks about the configuration parameters:
>> - I set `autoAck` to false to be able to acknowledge manually in my 
>> implementation.
>> - I set `concurrentConsumers` and `threadPoolSize` to 1 to guarantee 
>> that I consume the messages in the same order as they were added to the queue.
>> - I set `prefetchCount` to 50 to have at most 50 inflight messages in 
>> memory.
>> Now, I want to process these 50 messages in an asynchronous fashion 
>> and manually acknowledge when done. Each message has a `group 
>> identifier` header set. Messages from the same group will be 
>> processed sequentially, while messages from other groups will be processed concurrently.
>> I tried to start with the following:
>> from("rabbitmq://...")
>> .process(new AsyncProcessor() {
>>     @Override
>>     public boolean process(final Exchange exchange, final 
>> AsyncCallback
>> callback) {
>>         System.out.println("ASYNC");
>>         // TODO: (1) read group identifier (2) submit task to 
>> executor responsible for that particular group (3) call 
>> callback.done() in the task once done
>>         return false;
>>     }
>>     @Override
>>     public void process(final Exchange exchange) {
>>         throw new UnsupportedOperationException();
>>     }
>> })
>> The problem is here that only the first message is given to 
>> `process(exchange, callback)`. Is there a way to also receive the 
>> other inflight messages?
>> Note that I do *not* want to increase the number of RabbitMQ 
>> consumers, as this would skew with the message order. It is important 
>> that messages from the same group will be executed sequentially, 
>> hence the necessity to have one single RabbitMQ consumer.
>> Thanks!
>> Vhi Group DAC (Vhi) is a holding company for insurance and healthcare 
>> services, which include Vhi Healthcare DAC, Vhi Insurance DAC, Vhi 
>> Health Services DAC and Vhi Investments DAC. Vhi Healthcare DAC 
>> trading as Vhi Healthcare and Vhi Insurance DAC trading as Vhi 
>> Insurance are regulated by the Central Bank of Ireland. Vhi 
>> Healthcare is tied to Vhi Insurance DAC for health insurance in Ireland which is underwritten by Vhi Insurance DAC.
>> Vhi Healthcare is tied to Zurich Life Assurance plc for Vhi Life Term 
>> Insurance and Vhi Mortgage Protection which is underwritten by Zurich 
>> Life Assurance plc. Vhi Healthcare is tied to Collinson Insurance 
>> Services Limited for MultiTrip Travel Insurance, Backpacker Travel 
>> Insurance and Vhi Dental Insurance which are underwritten by Great 
>> Lakes Insurance SE, UK branch and for Vhi Canada Cover and Vhi 
>> International Health Insurance which are underwritten by Astrenska 
>> Insurance Limited. For more information about the Vhi Group please go 
>> to: 
>> sGHwtSG2OTpc-g&s=33&u=https%3a%2f%2fwww%2evhi%2eie%2fabout-vhi
>> Tá Vhi Group DAC (Vhi) ina chuideachta sealbhaíochta le haghaidh 
>> seirbhísí árachais agus seirbhísí cúram sláinte, lena n-áirítear Vhi 
>> Healthcare DAC, Vhi Insurance DAC, Vhi Health Services DAC agus Vhi 
>> Investments DAC. Déanann Banc Ceannais na hÉireann rialáil ar Vhi 
>> Healthcare DAC, ag trádáil dó mar Vhi Healthcare, agus ar Vhi 
>> Insurance DAC, ag trádáil dó mar Vhi Insurance. Tá Vhi Healthcare 
>> ceangailte le Vhi Insurance DAC le haghaidh árachas sláinte in 
>> Éirinn, rud atá frithgheallta ag Vhi Insurance DAC. Tá Vhi Healthcare 
>> ceangailte le Zurich Life Assurance plc le haghaidh Árachais Saoil de 
>> chuid Vhi agus Árachas Cosanta Morgáiste de chuid Vhi atá 
>> frithgheallta ag Zurich Life Assurance plc. Tá Vhi Healthcare 
>> ceangailte le Collinson Insurance Services Limited le haghaidh 
>> Árachas Taistil Ilturais agus Turasóirí Mála Droma agus Árachas 
>> Fiaclóireachta de chuid Vhi atá frithgheallta ag Great Lakes 
>> Insurance SE, UK branch agus le haghaidh Clúdach Cheanada de chuid 
>> Vhi agus Árachas Sláinte Idirnáisiúnta de chuid Vhi atá frithgheallta 
>> ag Astrenska Insurance Limited. Chun tuilleadh faisnéise a fháil faoi 
>> Ghrúpa Vhi, tabhair cuairt
>> ar: 
>> sGHwtSG2OTpc-g&s=33&u=https%3a%2f%2fwww%2evhi%2eie%2fabout-vhi
>> This e-mail and any files transmitted with it contain information 
>> which may be confidential and which may also be privileged and is 
>> intended solely for the use of the individual or entity to whom it is 
>> addressed. Unless you are the intended recipient you may not copy or 
>> use it, or disclose it to anyone else. Any opinions expressed are 
>> that of the individual and not necessarily that of the Vhi Group. If 
>> you have received this e-mail in error please notify the sender by return.