Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
The thread dumps of the 2 offending threads are below, but the basics are:
1. AppInfoParser in kafka uses static synchronized methods
2. Log4j synchronizes per Category
So if the StreamAppender tries create a new KafkaProducer, which calls the static sync AppInfoParser thread, which then tries to log to the same Category
"kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon prio=5 tid=23 BLOCKED at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57) Local Variable: java.lang.String#326563 Local Variable: java.lang.String#329864 at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336) Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#12 Local Variable: java.util.ArrayList#265184 Local Variable: org.apache.kafka.common.metrics.MetricConfig#9 Local Variable: java.util.LinkedHashMap#991 Local Variable: org.apache.kafka.common.internals.ClusterResourceListeners#9 Local Variable: java.util.ArrayList#265353 Local Variable: org.apache.kafka.clients.NetworkClient#9 Local Variable: org.apache.kafka.common.network.SslChannelBuilder#9 Local Variable: java.util.ArrayList#265374 Local Variable: org.apache.kafka.clients.producer.ProducerConfig#3 Local Variable: java.lang.String#309971 at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182) Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#11 Local Variable: org.apache.kafka.clients.producer.KafkaProducer#3 Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#7 Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#8 at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159) Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#2 at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137) Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#10 Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#9 Local Variable: java.util.Properties#38 Local Variable: com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#3 Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#9 Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#3 at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84) at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224) at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.send(SamzaRawLiKafkaSystemProducer.java:167) Local Variable: java.lang.String#326561 Local Variable: java.lang.IllegalStateException#2 Local Variable: java.lang.String#330077 Local Variable: org.apache.samza.system.SystemProducerException#4 Local Variable: java.lang.Integer#15116 at org.apache.samza.logging.log4j.StreamAppender.append(StreamAppender.java:115) at com.linkedin.atc.log4j.SafeStreamAppender.streamAppend(SafeStreamAppender.java:32) at com.linkedin.atc.log4j.SafeStreamAppender.append(SafeStreamAppender.java:23) Local Variable: com.linkedin.atc.log4j.SafeStreamAppender#1 at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) Local Variable: org.apache.log4j.helpers.AppenderAttachableImpl#1 at org.apache.log4j.Category.callAppenders(Category.java:206) Local Variable: org.apache.log4j.spi.LoggingEvent#24 Local Variable: org.apache.log4j.Logger#4 at org.apache.log4j.Category.forcedLog(Category.java:391) at org.apache.log4j.Category.log(Category.java:856) at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:323) at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.close(LiKafkaProducerImpl.java:313) Local Variable: java.util.concurrent.TimeUnit$3#1 at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:220) Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#4 Local Variable: com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#2 Local Variable: java.lang.Boolean#1 Local Variable: org.apache.samza.system.SystemProducerException#2 Local Variable: java.lang.Object#203455 at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157) at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown string>) at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362) Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#58 at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162) Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#58 Local Variable: org.apache.kafka.common.errors.TimeoutException#2 Local Variable: java.util.ArrayList$Itr#6 at org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277) Local Variable: java.util.ArrayList#263984 Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator#4 Local Variable: java.util.ArrayList$Itr#5 Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch#34 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210) Local Variable: org.apache.kafka.common.Cluster#2 Local Variable: java.util.Collections$EmptyMap#1 Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#2 Local Variable: java.util.HashMap$KeyIterator#2 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131) Local Variable: org.apache.kafka.clients.producer.internals.Sender#4 at java.lang.Thread.run(Thread.java:745) "kafka-producer-network-thread | kafka_producer-samza_xxx_yyy-i001" daemon prio=5 tid=35 BLOCKED at org.apache.log4j.Category.callAppenders(Category.java:204) Local Variable: org.apache.log4j.spi.LoggingEvent#26 Local Variable: org.apache.log4j.Logger#15 at org.apache.log4j.Category.forcedLog(Category.java:391) at org.apache.log4j.Category.log(Category.java:856) at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:304) at org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:87) at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:59) Local Variable: javax.management.ObjectName#162 at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:336) Local Variable: org.apache.kafka.common.network.SslChannelBuilder#7 Local Variable: java.util.ArrayList#264895 Local Variable: org.apache.kafka.common.internals.ClusterResourceListeners#7 Local Variable: java.lang.String#308990 Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#8 Local Variable: java.util.LinkedHashMap#854 Local Variable: java.util.ArrayList#264889 Local Variable: org.apache.kafka.common.metrics.MetricConfig#7 Local Variable: java.util.ArrayList#264910 Local Variable: org.apache.kafka.clients.producer.ProducerConfig#2 Local Variable: org.apache.kafka.clients.NetworkClient#7 at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:182) Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#5 Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#6 Local Variable: org.apache.kafka.common.config.AbstractConfig$RecordingMap#6 Local Variable: org.apache.kafka.clients.producer.KafkaProducer#2 at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:159) Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerConfig#1 at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl.<init>(LiKafkaProducerImpl.java:137) Local Variable: java.util.Properties#67 Local Variable: com.linkedin.kafka.liclients.auditing.NoOpAuditor#8 Local Variable: com.linkedin.samza.system.kafka.serializers.NoOpSegmentSerializer#2 Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#2 Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#14 Local Variable: org.apache.kafka.common.serialization.ByteArraySerializer#13 at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.createLiKafkaProducer(SamzaRawLiKafkaSystemProducer.java:84) at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.handleSendException(SamzaRawLiKafkaSystemProducer.java:224) Local Variable: org.apache.samza.system.SystemProducerException#1 Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl#5 Local Variable: com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer#3 at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer.lambda$send$17(SamzaRawLiKafkaSystemProducer.java:157) at com.linkedin.samza.system.kafka.SamzaRawLiKafkaSystemProducer$$Lambda$12.onCompletion(<unknown string>) at com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback.onCompletion(LiKafkaProducerImpl.java:362) Local Variable: com.linkedin.kafka.liclients.producer.LiKafkaProducerImpl$ErrorLoggingCallback#27 at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:162) Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch$Thunk#27 Local Variable: java.util.ArrayList$Itr#2 Local Variable: org.apache.kafka.common.errors.TimeoutException#1 at org.apache.kafka.clients.producer.internals.RecordBatch.expirationDone(RecordBatch.java:282) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:277) Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator#10 Local Variable: java.util.ArrayList$Itr#1 Local Variable: java.util.ArrayList#263305 Local Variable: org.apache.kafka.clients.producer.internals.RecordBatch#21 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:210) Local Variable: org.apache.kafka.common.Cluster#1 Local Variable: org.apache.kafka.clients.producer.internals.RecordAccumulator$ReadyCheckResult#1 Local Variable: java.util.HashMap$KeyIterator#1 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131) Local Variable: org.apache.kafka.clients.producer.internals.Sender#7 at java.lang.Thread.run(Thread.java:745)
After some discussion with pmaheshwari, we felt making the StreamAppender async was the only reliable solution. There are 2 approaches to this:
1. Use log4j2 which has async logging by default. The down side is that we'd have to update the StreamAppender and JmxAppender to be log4j2 plugins instead of simply extending AppenderSkeleton.
2. Add a queue and thread to StreamAppender s.t. new events are added to the queue with some timeout and the thread consumes from the queue and sends to the configured SystemProducer.
2 is favorable right now because it's quicker to implement and test. It can also be easily replaced by option 1 which is already a goal because we want to leverage the performance benefits of log4j2.
Attachments
Issue Links
- links to