Description
When using a journal JDBC persistence, we noticed that some messages in the DB table (activemq_msgs) don't always get deleted upon the messages being consumed. The adverse effect is that if the broker restarts (hosted in a K8S pod, so local journal files also lost) the broker will consider all non-deleted message as still unconsumed and will offer them up for consumption (this was an issue for us in production)
This seems to be cause by the fact that the persistence adapter does not use the right ID in the SQL statement to delete the line.
This comes from the fact that sometimes, the messageId has a "futureOrSequenceLong" of 0 as opposed to the correct ID (from the DB). This is because the "futureOrSequenceLong" gets set on that message only at the time the message is being persisted. But in a journal persistence, this happens on a frequency (5 minutes by default).
If a browse action occurs before the message is persisted, then a copy of the message is taken, which also copies of the wrong "futureOrSequenceLong". Upon consumption (assuming no restarts of the broker in the meantime), it is this copy of the messageId that is used when comes the time to remove the message from the DB.
The removal code actually caters for the missing "futureOrSequenceLong" in JDBCMessageStore [257] with this line:
long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null ? long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null ? (Long) ack.getLastMessageId().getFutureOrSequenceLong() : persistenceAdapter.getStoreSequenceIdForMessageId(context, ack.getLastMessageId(), destination)[0];
In which case it will get it from the DB directly.
But the issue is that in the described case, the "futureOrSequenceLong" is not null, but has a value of 0L.
This is due to this line in JournalMessageStore.addMessage [142] (called when the message is originally produced)
message.getMessageId().setFutureOrSequenceLong(0l);
I actually think that creating a message on a JournalMessageStore should leave the "futureOrSequenceLong" null so that later condition checks can pick it up as such and properly get the sequence from the database.
Attachments
Issue Links
- links to