Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10602

DLQ Reporter throws NPE when reporting from different thread

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.6.0
    • 2.7.0, 2.6.1
    • None
    • None

    Description

      If a connector uses separate threads to report errant records using the ErrantRecordReporter from KIP-610, it  can sometimes hit this race condition and throw an NPE when reporting both serialization errors from a converter and errors from a connector on separate thread because both use the same RetryWithToleranceOperator in the reporters, which results in the NPE because the error can get reset to `null`. 

      The `WorkerErrantRecordReporter::report` needs to be made threadsafe otherwise it can conflict with the DLQ reporter. 

      java.lang.NullPointerException
       at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.stacktrace(DeadLetterQueueReporter.java:187)
       at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:177)
       at org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:149)
       at org.apache.kafka.connect.runtime.errors.ProcessingContext.lambda$report$0(ProcessingContext.java:151)
       at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
       at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1624)
       at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
       at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
       at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
       at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
       at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
       at org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:153)
       at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:126)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:498)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
       at java.base/java.lang.Thread.run(Thread.java:832) 

       

      Attachments

        Issue Links

          Activity

            People

              tombentley Tom Bentley
              levzemlyanov Lev Zemlyanov
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: