Details
Description
Hi
I am getting an issue where consumers are stuck unable to consume messages after an issue with the underlying persistence failing to store a message (EG: out of disk space).
The issue I am seeing is that when a subscription is removed from a queue (the subscription get forcibly removed from the persistence adapter failure), as part of the shutdown, messages are taken off the subscription and then are stored in redeliveredWaitingDispatch.
org.apache.activemq.broker.region.Queue:547
List<MessageReference> unAckedMessages = sub.remove(context, this);
org.apache.activemq.broker.region.Queue:583
if (!qmr.isDropped()) {
redeliveredWaitingDispatch.addMessageLast(qmr);
}
The only problem is that /some/ of these messages are actually acked org.apache.activemq.broker.region.QueueMessageReference#isAcked. I have put breakpoints in this section of the code and confirmed this.
This later becomes a problem. If there are any messages that are acked that end up in redeliveredWaitingDispatch, this stops any message consumption from happening.
If you follow org.apache.activemq.broker.region.Queue#doDispatch:1957, you can see that redeliveredWaitingDispatch /always/ takes precedence. New messages that come in don't get dispatched until redeliveredWaitingDispatch is empty. The problem is that this /never/ gets emptied.
In org.apache.activemq.broker.region.Queue#doActualDispatch:2002 at line 2028, we only dispatch and remove the message if the QueueMessageReference is not acknowledged.
if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) { // Dispatch it. s.add(node); LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId()); iterator.remove(); target = s; break; }
I have a simple patch that will fix the issue, but I am not sure of the full implications of the patch. I am hoping one of the ActiveMQ developers could chime in here:
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index c0a237f..64717f5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -580,7 +580,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } } - if (!qmr.isDropped()) { + if (!qmr.isDropped() && !qmr.isAcked()) { redeliveredWaitingDispatch.addMessageLast(qmr); } }
To reproduce this issue, I filled the disk completely by putting messages on a queue. I then used dd to zero out any remaining space on the disk. I then consumed the messages from the queue. /Some/ messages will be consumed but at some point the consumer will fail due to the out of space errors. Its at this point there will be some acked messages put on redeliveredWaitingDispatch queue. This can be confirmed by adding a conditional breakpoint at org.apache.activemq.broker.region.Queue:584 where qmr.isAcked() == true.
Also see: AMQ-4052