Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
Steps to reproduce the issue:
---------------------------------------
- Created the Kafka topic with single partition.
- Created the application with the following DAG:
BatchSequenceGenerator -> KafkaSinglePortExactlyOnceOutputOperator
- of partitions of KafkaSinglePortExactlyOnceOutputOperator = 2. Let's say KO1, KO2 are the two instances.
- Launched the app, after some time, manually killed the one of the instance of "KafkaSinglePortExactlyOnceOutputOperator" operator(KO2).
- During recovery, the instance comes up and after some time, it goes to the blocked state. App master killed this instance.
Observation:
----------------
- There is an infinite while loop in rebuildPartialWindow() method.
- While loop will break on the below 2 conditions:
a) # of trails for "polled records from Kafka is empty" = 10
b) Crossed boundary (consumerRecord.offset() >= currentOffset)
In this scenario, KO1 keeps on writing the data to Kafka. So, the first condition will not satisfy.
Operator is not checking the 2nd condition because of the below continue statement:
if (!doesKeyBelongsToThisInstance(operatorId, consumerRecord.key()))
Solution: First check the cross boundary condition and then check the doesKeyBelongsToThisInstance(..).
Attachments
Issue Links
- links to