Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7112

StreamThread does not check for state again after pollRequests()

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 2.0.0
    • streams
    • None

    Description

      In StreamThread's main loop, we have:

              if (state == State.PARTITIONS_ASSIGNED) {
                  // try to fetch some records with zero poll millis
                  // to unblock the restoration as soon as possible
                  records = pollRequests(Duration.ZERO);
      
                  if (taskManager.updateNewAndRestoringTasks()) {
                      setState(State.RUNNING);
                  }
              }
      

      in which we first check for state, and if it is PARTITIONS_ASSIGNED then call `consumer.poll()` and then call `askManager.updateNewAndRestoringTasks()`. There is a race condition though, that if another rebalance gets triggered, then `onPartitionRevoked` will be called in which we will restoreConsumer.unsubscribe();, and then if we call taskManager.updateNewAndRestoringTasks() right away we will see this:

      Encountered the following error during processing: (org.apache.kafka.streams.processor.internals.StreamThread)
      java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
              at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
              at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1150)
              at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
              at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317)
              at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
              at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
              at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
      

      Attachments

        Issue Links

          Activity

            People

              guozhang Guozhang Wang
              guozhang Guozhang Wang
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: