Uploaded image for project: 'Apache NiFi'
  1. Apache NiFi
  2. NIFI-10885

ConsumeMQTT should stop client threads

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 1.18.0
    • None
    • None
    • None

    Description

      When ConsumeMQTT is stopped, some client threads keep running which prevents NiFi to stop cleanly.

      2022-11-28 13:14:01,126 INFO [main] org.apache.nifi.bootstrap.Command NiFi PID [13987] shutdown in progress...
      2022-11-28 13:14:03,140 INFO [main] org.apache.nifi.bootstrap.Command NiFi PID [13987] shutdown in progress...
      2022-11-28 13:14:05,152 INFO [main] org.apache.nifi.bootstrap.Command NiFi PID [13987] shutdown in progress...
      2022-11-28 13:14:05,162 WARN [main] org.apache.nifi.bootstrap.Command NiFi PID [13987] shutdown not completed after 20 seconds: Killing process

      It affects the v5 client version and may be an issue in the underlying client library.

      Suspicious threads:

      "com.hivemq.client.mqtt-1-1@20668" prio=10 tid=0x69 nid=NA runnable
        java.lang.Thread.State: RUNNABLE
            at sun.nio.ch.KQueue.poll(KQueue.java:-1)
            at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:122)
            at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:129)
            - locked <0x522a> (a sun.nio.ch.KQueueSelectorImpl)
            - locked <0x522b> (a io.netty.channel.nio.SelectedSelectionKeySet)
            at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:141)
            at io.netty.channel.nio.SelectedSelectionKeySetSelector.select(SelectedSelectionKeySetSelector.java:62)
            at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:883)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:526)
            at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.lang.Thread.run(Thread.java:833)"RxComputationThreadPool-1@20859" daemon prio=5 tid=0x6a nid=NA waiting
        java.lang.Thread.State: WAITING
            at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:341)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block(AbstractQueuedSynchronizer.java:506)
            at java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3463)
            at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3434)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1623)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1170)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1062)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1122)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
            at java.lang.Thread.run(Thread.java:833)"RxSchedulerPurge-1@20122" daemon prio=5 tid=0x67 nid=NA waiting
        java.lang.Thread.State: WAITING
            at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
            at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:252)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1672)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1182)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1062)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1122)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
            at java.lang.Thread.run(Thread.java:833)"RxCachedWorkerPoolEvictor-1@20139" daemon prio=5 tid=0x68 nid=NA waiting
        java.lang.Thread.State: WAITING
            at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
            at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:252)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1672)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1182)
            at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:899)
            at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1062)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1122)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
            at java.lang.Thread.run(Thread.java:833) 

      Attachments

        Issue Links

          Activity

            People

              sabonyi Nandor Soma Abonyi
              turcsanyip Peter Turcsanyi
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: