Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33162

seperate the executor in DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and webMonitorEndpoint

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.13.1
    • 2.0.0
    • Runtime / REST
    • None

    Description

      when starting a job with large number of taskmanagers, the jobmanager of the job failed to respond to and rest request. when look into the jstack we found all the 4 threads are server metrics fetcher.

      // code placeholder
      "Flink-DispatcherRestEndpoint-thread-4" #91 daemon prio=5 os_prio=0 tid=0x00007f17e7823000 nid=0x246 waiting for monitor entry [0x00007f178e9fe000]   java.lang.Thread.State: BLOCKED (on object monitor)	at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)	- waiting to lock <0x00000003d5f62638> (a org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)	at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)	at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown Source)	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)	at java.util.concurrent.FutureTask.run(FutureTask.java:266)	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)	at java.lang.Thread.run(Thread.java:748)
         Locked ownable synchronizers:	- <0x00000003ce80d8f0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
      
      "Flink-DispatcherRestEndpoint-thread-3" #88 daemon prio=5 os_prio=0 tid=0x00007f17e88af000 nid=0x243 waiting for monitor entry [0x00007f1790dfe000]   java.lang.Thread.State: BLOCKED (on object monitor)	at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)	- waiting to lock <0x00000003d5f62638> (a org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)	at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)	at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown Source)	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)	at java.util.concurrent.FutureTask.run(FutureTask.java:266)	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)	at java.lang.Thread.run(Thread.java:748)
         Locked ownable synchronizers:	- <0x00000003ce80df88> (a java.util.concurrent.ThreadPoolExecutor$Worker) 
      
      "Flink-DispatcherRestEndpoint-thread-2" #79 daemon prio=5 os_prio=0 tid=0x00007f1793473800 nid=0x23a runnable [0x00007f17922fd000]   java.lang.Thread.State: RUNNABLE	at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.add(MetricStore.java:216)	at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:82)	- locked <0x00000003d5f62638> (a org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)	at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)	at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown Source)	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)	at java.util.concurrent.FutureTask.run(FutureTask.java:266)	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)	at java.lang.Thread.run(Thread.java:748)
         Locked ownable synchronizers:	- <0x00000003ce811120> (a java.util.concurrent.ThreadPoolExecutor$Worker)
      
      "Flink-DispatcherRestEndpoint-thread-1" #76 daemon prio=5 os_prio=0 tid=0x00007f17a56f5000 nid=0x237 waiting for monitor entry [0x00007f1792cfd000]   java.lang.Thread.State: BLOCKED (on object monitor)	at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)	- waiting to lock <0x00000003d5f62638> (a org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)	at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)	at org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown Source)	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)	at java.util.concurrent.FutureTask.run(FutureTask.java:266)	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)	at java.lang.Thread.run(Thread.java:748)
         Locked ownable synchronizers:	- <0x00000003ce8115f0> (a java.util.concurrent.ThreadPoolExecutor$Worker)

       

      I suggest to enable a policy to reject unhandlable request executor

      // code placeholder
      
      Executors.newScheduledThreadPool( numThreads, new ExecutorThreadFactory.Builder() .setThreadPriority(threadPriority) .setPoolName("Flink-" + componentName) .build());
      
      
      final MetricFetcher metricFetcher =
              updateInterval == 0
                      ? VoidMetricFetcher.INSTANCE
                      : MetricFetcherImpl.fromConfiguration(
                              configuration,
                              metricQueryServiceRetriever,
                              dispatcherGatewayRetriever,
                              executor);
      
      webMonitorEndpoint =
              restEndpointFactory.createRestEndpoint(
                      configuration,
                      dispatcherGatewayRetriever,
                      resourceManagerGatewayRetriever,
                      blobServer,
                      executor,
                      metricFetcher,
                      highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                      fatalErrorHandler);
       

      Attachments

        Activity

          People

            Unassigned Unassigned
            zhoujira86 xiaogang zhou
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: