Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
1.8.0, 1.16.0, 1.17.0, 1.19.0
-
None
-
None
Description
When running the kafka connector in bounded mode, the stop condition can be defined as the latest offset when the job starts. Unfortunately, Kafka's latest offset calculation also includes special marker records, such as transaction markers, in the overall count.
When Flink waits for a job to finish, it compares the number of records read until the point with the original latest offset [1]. Since the consumer will never see the special marker records, the latest offset is never reached, and the job gets stuck.
To reproduce the issue, you can write into a Kafka topic and make sure that the latest record is a transaction end event. Afterwards you can start a Flink job configured with `scan.bounded.latest-offset` pointing to that topic.
[1]https://github.com/confluentinc/flink/blob/59c5446c4aac0d332a21b456f4a3f82576104b80/flink-connectors/confluent-connector-kafka/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java#L128
Attachments
Issue Links
- duplicates
-
FLINK-34470 Transactional message + Table api kafka source with 'latest-offset' scan bound mode causes indefinitely hanging
- Resolved