Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-5927

Acknowledged messages marked for redelivery stop new messages being consumed

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Abandoned
    • 5.11.1
    • 5.x
    • Broker
    • None
    • Ubuntu 12.04

      java version "1.7.0_75"
      Java(TM) SE Runtime Environment (build 1.7.0_75-b13)
      Java HotSpot(TM) 64-Bit Server VM (build 24.75-b04, mixed mode)

    • Patch Available

    Description

      Hi

      I am getting an issue where consumers are stuck unable to consume messages after an issue with the underlying persistence failing to store a message (EG: out of disk space).

      The issue I am seeing is that when a subscription is removed from a queue (the subscription get forcibly removed from the persistence adapter failure), as part of the shutdown, messages are taken off the subscription and then are stored in redeliveredWaitingDispatch.

      org.apache.activemq.broker.region.Queue:547

        List<MessageReference> unAckedMessages = sub.remove(context, this);
      

      org.apache.activemq.broker.region.Queue:583

        if (!qmr.isDropped()) {
            redeliveredWaitingDispatch.addMessageLast(qmr);
        }
      

      The only problem is that /some/ of these messages are actually acked org.apache.activemq.broker.region.QueueMessageReference#isAcked. I have put breakpoints in this section of the code and confirmed this.

      This later becomes a problem. If there are any messages that are acked that end up in redeliveredWaitingDispatch, this stops any message consumption from happening.

      If you follow org.apache.activemq.broker.region.Queue#doDispatch:1957, you can see that redeliveredWaitingDispatch /always/ takes precedence. New messages that come in don't get dispatched until redeliveredWaitingDispatch is empty. The problem is that this /never/ gets emptied.

      In org.apache.activemq.broker.region.Queue#doActualDispatch:2002 at line 2028, we only dispatch and remove the message if the QueueMessageReference is not acknowledged.

      if (dispatchSelector.canSelect(s, node) && assignMessageGroup(s, (QueueMessageReference)node) && !((QueueMessageReference) node).isAcked() ) {
                                  // Dispatch it.
                                  s.add(node);
                                  LOG.trace("assigned {} to consumer {}", node.getMessageId(), s.getConsumerInfo().getConsumerId());
                                  iterator.remove();
                                  target = s;
                                  break;
                              }
      

      I have a simple patch that will fix the issue, but I am not sure of the full implications of the patch. I am hoping one of the ActiveMQ developers could chime in here:

      diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
      index c0a237f..64717f5 100755
      --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
      +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
      @@ -580,7 +580,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                                   }
                               }
                           }
      -                    if (!qmr.isDropped()) {
      +                    if (!qmr.isDropped() && !qmr.isAcked()) {
                               redeliveredWaitingDispatch.addMessageLast(qmr);
                           }
                       }
      
      

      To reproduce this issue, I filled the disk completely by putting messages on a queue. I then used dd to zero out any remaining space on the disk. I then consumed the messages from the queue. /Some/ messages will be consumed but at some point the consumer will fail due to the out of space errors. Its at this point there will be some acked messages put on redeliveredWaitingDispatch queue. This can be confirmed by adding a conditional breakpoint at org.apache.activemq.broker.region.Queue:584 where qmr.isAcked() == true.

      Also see: AMQ-4052

      Attachments

        Activity

          People

            Unassigned Unassigned
            rdoyle Ryan Doyle
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: