Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13544

Deadlock during shutting down kafka broker because of connectivity problem with zookeeper

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.8.1
    • 3.1.0, 3.0.1, 2.8.2
    • core
    • None

    Description

      Hi team,

      Kafka version: 2.8.1
      Configuration: 3 kafka brokers in different availability zones and 3 zookeeper brokers in different availability zones.

      I faced with deadlock in kafka. I've attached stack dump of the kafka state to this ticket. The locked threads are "feature-zk-node-event-process-thread" and "kafka-shutdown-hook".

      Context:
      My kafka cluster had connectivity problems with zookeeper and in the logs I saw the next exception:

      The stacktrace:

      [2021-12-06 18:31:14,629] WARN Unable to reconnect to ZooKeeper service, session 0x10000039563000f has expired (org.apache.zookeeper.ClientCnxn)
      [2021-12-06 18:31:14,629] INFO Unable to reconnect to ZooKeeper service, session 0x10000039563000f has expired, closing socket connection (org.apache.zookeeper.ClientCnxn)
      [2021-12-06 18:31:14,629] INFO EventThread shut down for session: 0x10000039563000f (org.apache.zookeeper.ClientCnxn)
      [2021-12-06 18:31:14,631] INFO [ZooKeeperClient Kafka server] Session expired. (kafka.zookeeper.ZooKeeperClient)
      [2021-12-06 18:31:14,632] ERROR [feature-zk-node-event-process-thread]: Failed to process feature ZK node change event. The broker will eventually exit. (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
      kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either before or while waiting for connection
          at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:279)
          at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$1(ZooKeeperClient.scala:261)
          at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:261)
          at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1797)
          at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1767)
          at kafka.zk.KafkaZkClient.retryRequestUntilConnected(KafkaZkClient.scala:1762)
          at kafka.zk.KafkaZkClient.getDataAndStat(KafkaZkClient.scala:771)
          at kafka.zk.KafkaZkClient.getDataAndVersion(KafkaZkClient.scala:755)
          at kafka.server.FinalizedFeatureChangeListener$FeatureCacheUpdater.updateLatestOrThrow(FinalizedFeatureChangeListener.scala:74)
          at kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147)
          at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 

      The exception is thrown in feature-zk-node-event-process-thread thread and it is catched in method FinalizedFeatureChangeListener.ChangeNotificationProcessorThread.doWork and then doWork method throws FatalExitError(1).
      The FatalExitError catched in ShutdownableThread.run method and call Exit.exit(e.statusCode()) which calls System.exit under the hood.

      The stackdump of "feature-zk-node-event-process-thread" thread:

      "feature-zk-node-event-process-thread" #23 prio=5 os_prio=0 cpu=163.19ms elapsed=1563046.32s tid=0x00007fd0dcdec800 nid=0x2088 in Object.wait()  [0x00007fd07e2c1000]
         java.lang.Thread.State: WAITING (on object monitor)
          at java.lang.Object.wait(java.base@11.0.11/Native Method)
          - waiting on <no object reference available>
          at java.lang.Thread.join(java.base@11.0.11/Thread.java:1300)
          - waiting to re-lock in wait() <0x0000000088b9d3c8> (a org.apache.kafka.common.utils.KafkaThread)
          at java.lang.Thread.join(java.base@11.0.11/Thread.java:1375)
          at java.lang.ApplicationShutdownHooks.runHooks(java.base@11.0.11/ApplicationShutdownHooks.java:107)
          at java.lang.ApplicationShutdownHooks$1.run(java.base@11.0.11/ApplicationShutdownHooks.java:46)
          at java.lang.Shutdown.runHooks(java.base@11.0.11/Shutdown.java:130)
          at java.lang.Shutdown.exit(java.base@11.0.11/Shutdown.java:174)
          - locked <0x00000000806872f8> (a java.lang.Class for java.lang.Shutdown)
          at java.lang.Runtime.exit(java.base@11.0.11/Runtime.java:116)
          at java.lang.System.exit(java.base@11.0.11/System.java:1752)
          at org.apache.kafka.common.utils.Exit$2.execute(Exit.java:43)
          at org.apache.kafka.common.utils.Exit.exit(Exit.java:66)
          at kafka.utils.Exit$.exit(Exit.scala:28)
          at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:102) 

      System.exit method before shutting down virtual machine calls shutdown hooks and spawns the thread "kafka-shutdown-hook".
      The thread "kafka-shutdown-hook" calls kafka.server.KafkaServer.shutdown which during work calls

      if (featureChangeListener != null)
              CoreUtils.swallow(featureChangeListener.close(), this) 

      Where featureChangeListener.close is

      def close(): Unit = {
          zkClient.unregisterStateChangeHandler(ZkStateChangeHandler.name)
          zkClient.unregisterZNodeChangeHandler(FeatureZNodeChangeHandler.path)
          queue.clear()
          thread.shutdown()
          thread.join()
        } 

      It enteres in the deadlock on the line "thread.join()" because it waits that the thread "feature-zk-node-event-process-thread" should die but this thread spawned the thead "kafka-shutdown-hook" and can't die until the thread "kafka-shutdown-hook" is finished.

      The stack dump of the thread "kafka-shutdown-hook":

      "kafka-shutdown-hook" #18 prio=5 os_prio=0 cpu=134.10ms elapsed=50906.28s tid=0x00007fd0ac15c800 nid=0x6d3bb in Object.wait()  [0x00007fd02dcba000]
         java.lang.Thread.State: WAITING (on object monitor)
          at java.lang.Object.wait(java.base@11.0.11/Native Method)
          - waiting on <no object reference available>
          at java.lang.Thread.join(java.base@11.0.11/Thread.java:1300)
          - waiting to re-lock in wait() <0x0000000088973e98> (a kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
          at java.lang.Thread.join(java.base@11.0.11/Thread.java:1375)
          at kafka.server.FinalizedFeatureChangeListener.close(FinalizedFeatureChangeListener.scala:245)
          at kafka.server.KafkaServer.$anonfun$shutdown$23(KafkaServer.scala:717)
          at kafka.server.KafkaServer$$Lambda$1681/0x000000084082b040.apply$mcV$sp(Unknown Source)
          at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
          at kafka.server.KafkaServer.shutdown(KafkaServer.scala:717)
          at kafka.Kafka$.$anonfun$main$3(Kafka.scala:100)
          at kafka.Kafka$$$Lambda$205/0x0000000840233840.apply$mcV$sp(Unknown Source)
          at kafka.utils.Exit$.$anonfun$addShutdownHook$1(Exit.scala:38)
          at kafka.Kafka$$$Lambda$206/0x0000000840233c40.run(Unknown Source)
          at java.lang.Thread.run(java.base@11.0.11/Thread.java:829)

      Attachments

        1. kafka_broker_logs.log
          52 kB
          Andrei Lakhmanets
        2. kafka_broker_stackdump.txt
          27 kB
          Andrei Lakhmanets

        Issue Links

          Activity

            People

              joecqupt joecqupt
              andrei-vlg Andrei Lakhmanets
              Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: