Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
1.16.1
-
None
-
None
Description
When I use KafkaSource for consuming topics and set the Kafka parameter enable.auto.commit=true, while also enabling checkpointing for the task, I notice that both will commit offsets. Should Kafka's auto-commit be disabled when enabling Flink checkpointing, similar to how it's done with FlinkKafkaConsumer?
How to reproduce
// code placeholder Properties kafkaParams = new Properties(); kafkaParams.put("enable.auto.commit", "true"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("fetch.min.bytes", "4096"); kafkaParams.put("sasl.mechanism", "PLAIN"); kafkaParams.put("security.protocol", "SASL_PLAINTEXT"); kafkaParams.put("bootstrap.servers", bootStrap); kafkaParams.put("group.id", expoGroupId); kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";"); KafkaSource<String> source = KafkaSource .<String>builder() .setBootstrapServers(bootStrap) .setProperties(kafkaParams) .setGroupId(expoGroupId) .setTopics(Arrays.asList(expoTopic)) .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.latest()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source") .filter(r -> true); env.enableCheckpointing(3000 * 1000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); env.getCheckpointConfig().setCheckpointTimeout(1000 * 300); env.execute("kafka-consumer");
the kafka client's org.apache.kafka.clients.consumer.internals.ConsumerCoordinator continuously committing offsets.