Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Won't Fix
-
2.0.0, 1.2.2
-
None
-
None
Description
shaikasifullah mentioned that he was experiencing lost tuples when restarting a Trident topology that uses windowing alongside the opaque Kafka spout.
I think this is due to a bug in the Trident windowing implementation.
Trident doesn't use the regular acking mechanism to keep track of all tuples in a batch. Instead, the bolt executors in Trident send "coordinator" tuples downstream following each batch, indicating how many tuples were in the batch. These coordinator tuples are anchored to the initial "emit batch" tuple at the master batch coordinator (MBC). The next bolt executor in line checks if it received all the expected tuples, and fails the "emit batch" tree if not. Otherwise, the entire batch is considered acked when the coordinator tuple is acked, which happens as soon as it is received (purposefully ignoring the commit mechanism here).
The bolt executor notifies the wrapped bolt when a batch starts, and when it finishes. The expectation is that the bolt will emit any new tuples it wants anchored to the coordinator tuple before the bolt executor considers the batch finished. See https://github.com/apache/storm/blob/19fbfb9ac8f82719cf70fedf6a024acaeec4e804/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java#L127.
The windowing mechanism in Trident is implemented via a processor https://github.com/apache/storm/blob/19fbfb9ac8f82719cf70fedf6a024acaeec4e804/storm-client/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java#L147. The processor collects received tuples grouped by batch, and only passes them to the WindowManager when a batch is considered complete. At this point, it will also check if any triggers have fired (e.g. due to timeout), and will emit any resulting windows.
The issue here is that there is no correlation between the finished batch and which tuples the window processor chooses to emit during the finishBatch call. Unless it emits exactly the tuples from the received batch, there is a risk of losing the at-least-once property, since the bolt executor will ack the coordinator tuple immediately following finishBatch.
Just to give a concrete example:
MBC starts txid 1 by emitting an "emit batch" tuple
Spout executor receives the tuple, emits tuple 1-10, then emits coordinator tuple containing expected count of 10 tuples.
Bolt executor receives tuple 1-10
Bolt executor receives coordinator tuple from upstream spout, containing an expected count of 10 tuples
Bolt executor calls finishBatch
Window processor is configured with a window of 10 seconds, and decides not to emit the 10 tuples. Since nothing is emitted, no new tuples are anchored at the coordinator tuple.
Bolt executor acks the coordinator tuple at the MBC
The MBC sees that the "emit batch" tuple has been acked, and starts the commit process. At this point Trident is free to assume the 10 tuples have been correctly processed and e.g. write to Zookeeper that the Kafka spout should pick up at offset 10 next time it starts.