Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-16077

Streams fails to close task after restoration when input partitions are updated

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 3.7.0
    • None
    • streams

    Description

      There is a race condition in the state updater that can cause the following:

      1. We have an active task in the state updater
      2. We get fenced. We recreate the producer, transactions now uninitialized. We ask the state updater to give back the task, add a pending action to close the task clean once it’s handed back
      3. We get a new assignment with updated input partitions. The task is still owned by the state updater, so we ask the state updater again to hand it back and add a pending action to update its input partition
      4. The task is handed back by the state updater. We update its input partitions but forget to close it clean (pending action was overwritten)
      5. Now the task is in an initialized state, but the underlying producer does not have transactions initialized

      This can lead to an exception like this:

      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-0000000005, topic=node-name-repartition, partition=0, offset=618798, stacktrace=java.lang.IllegalStateException: TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2: Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:847)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:847)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1919)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:953)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
      streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)

      This affects EOSv2 only.

      Attachments

        Issue Links

          Activity

            People

              lucasbru Lucas Brutschy
              lucasbru Lucas Brutschy
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: