Description
There is a race condition in the state updater that can cause the following:
- We have an active task in the state updater
- We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
- We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back and add a pending action to update its input partition
- The task is handed back by the state updater. We update its input partitions but forget to close it clean (pending action was overwritten)
- Now the task is in an initialized state, but the underlying producer does not have transactions initialized
This can lead to an exception like this:
streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-0000000005, topic=node-name-repartition, partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:847) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:847) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1919) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:953) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
This affects EOSv2 only.
Attachments
Issue Links
- links to