Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35506

disable kafka auto-commit and rely on flink’s checkpointing if both are enabled

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 1.16.1
    • None
    • Connectors / Kafka
    • None

    Description

      When I use KafkaSource for consuming topics and set the Kafka parameter enable.auto.commit=true, while also enabling checkpointing for the task, I notice that both will commit offsets. Should Kafka's auto-commit be disabled when enabling Flink checkpointing, similar to how it's done with FlinkKafkaConsumer?

       

      How to reproduce

       

      // code placeholder
      Properties kafkaParams = new Properties();
      kafkaParams.put("enable.auto.commit", "true");
      kafkaParams.put("auto.offset.reset", "latest");
      kafkaParams.put("fetch.min.bytes", "4096");
      kafkaParams.put("sasl.mechanism", "PLAIN");
      kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
      kafkaParams.put("bootstrap.servers", bootStrap);
      kafkaParams.put("group.id", expoGroupId);
      kafkaParams.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
      
      KafkaSource<String> source = KafkaSource
              .<String>builder()
              .setBootstrapServers(bootStrap)
              .setProperties(kafkaParams)
              .setGroupId(expoGroupId)
              .setTopics(Arrays.asList(expoTopic))
              .setValueOnlyDeserializer(new SimpleStringSchema())
              .setStartingOffsets(OffsetsInitializer.latest())
              .build();
      
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source")
              .filter(r ->  true);
      
      env.enableCheckpointing(3000 * 1000);
      env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000 * 1000);
      env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
      env.getCheckpointConfig().setCheckpointTimeout(1000 * 300);
      env.execute("kafka-consumer"); 

       

       

      the kafka client's org.apache.kafka.clients.consumer.internals.ConsumerCoordinator continuously committing offsets.

      Attachments

        Activity

          People

            Unassigned Unassigned
            elon elon_X
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: