Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
0.11.0.1
Description
We decided to start experimenting with Standby Replicas of our State Stores by setting the following configuration setting:
num.standby.replicas=1
Most applications did okay with this except for one that used an in memory state store instead of a persistent state store. With the new configuration, the first instance of this application booted fine. When the second instance came up, both instances crashed with the following exception:
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1037) at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:752) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:524) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
Monit attempted to restart both instances but they would just continue to crash over and over again. The state store in our problematic application is declared like so:
Stores
.create("TheStateStore")
.withStringKeys()
.withStringValues()
.inMemory()
.build()
Luckily we had a config switch in place that could turn on an alternate, persistent state store. As soon as we flipped to the persistent state store, things started working as we expected.
Attachments
Issue Links
- links to
Looks like the issue is in ProcessorStateManager. We only restore and hence assign partitions if the store is persistent