Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
-
The issue happens in a Kinesis -> Flink -> Kafka exactly-once setup with:
- Flink versions checked 1.16.3 and 1.18.1
- Kinesis connector checked 1.16.3 and 4.2.0-1.18
- checkpointing configured at 1 minute with EXACTLY_ONCE mode:
StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment (); execEnv.enableCheckpointing (60000,EXACTLY_ONCE); execEnv.getCheckpointConfig ().setCheckpointTimeout (90000); execEnv.getCheckpointConfig ().setCheckpointStorage (CHECKPOINTS_PATH);
- Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
Properties sinkConfig = new Properties (); sinkConfig.put ("transaction.timeout.ms", 480000); KafkaSink<String> sink = KafkaSink.<String>builder () .setBootstrapServers ("localhost:9092") .setTransactionalIdPrefix ("test-prefix") .setDeliverGuarantee (EXACTLY_ONCE) .setKafkaProducerConfig (sinkConfig) .setRecordSerializer ( (KafkaRecordSerializationSchema<String>) (element, context, timestamp) -> new ProducerRecord<> ( "test-output-topic", null, element.getBytes ())) .build ();
- Kinesis consumer defined as:
FlinkKinesisConsumer<ByteBuffer> flinkKinesisConsumer = new FlinkKinesisConsumer<> ("test-stream", new AbstractDeserializationSchema<> () { @Override public ByteBuffer deserialize (byte[] bytes) { // Return return ByteBuffer.wrap (bytes); } }, props);
The issue happens in a Kinesis -> Flink -> Kafka exactly-once setup with: Flink versions checked 1.16.3 and 1.18.1 Kinesis connector checked 1.16.3 and 4.2.0-1.18 checkpointing configured at 1 minute with EXACTLY_ONCE mode: StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment (); execEnv.enableCheckpointing (60000,EXACTLY_ONCE); execEnv.getCheckpointConfig ().setCheckpointTimeout (90000); execEnv.getCheckpointConfig ().setCheckpointStorage (CHECKPOINTS_PATH); Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee: Properties sinkConfig = new Properties (); sinkConfig.put ( "transaction.timeout.ms" , 480000); KafkaSink< String > sink = KafkaSink.< String >builder () .setBootstrapServers ( "localhost:9092" ) .setTransactionalIdPrefix ( "test-prefix" ) .setDeliverGuarantee (EXACTLY_ONCE) .setKafkaProducerConfig (sinkConfig) .setRecordSerializer ( (KafkaRecordSerializationSchema< String >) (element, context, timestamp) -> new ProducerRecord<> ( "test-output-topic" , null , element.getBytes ())) .build (); Kinesis consumer defined as: FlinkKinesisConsumer<ByteBuffer> flinkKinesisConsumer = new FlinkKinesisConsumer<> ( "test-stream" , new AbstractDeserializationSchema<> () { @Override public ByteBuffer deserialize ( byte [] bytes) { // Return return ByteBuffer.wrap (bytes); } }, props);
-
Important
Description
Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a stop-with-savepoint, Flink duplicates in Kafka all the records between the last checkpoint and the savepoint at resume:
- Event1 is written to Kinesis
- Event1 is processed by Flink
- Event1 is committed to Kafka at the checkpoint
- ............................................................................
- Event2 is written to Kinesis
- Event2 is processed by Flink
- Stop with savepoint is triggered manually
- Event2 is committed to Kafka
- ............................................................................
- Job is resumed from the savepoint
- Event2 is written again to Kafka at the first checkpoint
I believe that it's a Kinesis connector issue for 2 reasons:
- I've checked the actual Kinesis sequence number in the _metadata file generated at stop-with-savepoint and it's the one from the checkpoint before the savepoint instead of being the one of the last record committed to Kafka.
- I've tested exactly the save job with Kafka as source instead of Kinesis as source and the behaviour does not reproduce.
Attachments
Issue Links
- links to