Description
In StreamThread's main loop, we have:
if (state == State.PARTITIONS_ASSIGNED) { // try to fetch some records with zero poll millis // to unblock the restoration as soon as possible records = pollRequests(Duration.ZERO); if (taskManager.updateNewAndRestoringTasks()) { setState(State.RUNNING); } }
in which we first check for state, and if it is PARTITIONS_ASSIGNED then call `consumer.poll()` and then call `askManager.updateNewAndRestoringTasks()`. There is a race condition though, that if another rebalance gets triggered, then `onPartitionRevoked` will be called in which we will restoreConsumer.unsubscribe();, and then if we call taskManager.updateNewAndRestoringTasks() right away we will see this:
Encountered the following error during processing: (org.apache.kafka.streams.processor.internals.StreamThread) java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1150) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Attachments
Issue Links
- links to