Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32592

(Stream)ExEnv#initializeContextEnvironment isn't thread-safe

    XMLWordPrintableJSON

Details

    Description

      Context

      We are using the flink-k8s-operator to deploy multiple jobs (up to 32) to a single session cluster. The job submissions done by the operator happen concurrently, basically at the same time.

      Operator version: 1.5.0

      Flink version:  1.15.4, 1.7.1, 1.18 (master@f37d41cf)

      Problem

      Rarely (~once every 50 deployments) one of the jobs will not be executed. In the following incident 4 jobs are deployed at the same time:

      • gorner-task-staging-e5730831
      • gorner-facility-staging-e5730831
      • gorner-aepp-staging-e5730831
      • gorner-session-staging-e5730831

       
      The operator submits the job, they all get a reasonable jobID:

      2023-07-14 10:25:35,295 o.a.f.k.o.s.AbstractFlinkService [INFO ][aelps-staging/gorner-task-staging-e5730831] Submitting job: 4968b186061e44390000000000000002 to session cluster.
      2023-07-14 10:25:35,297 o.a.f.k.o.s.AbstractFlinkService [INFO ][aelps-staging/gorner-facility-staging-e5730831] Submitting job: 91a5260d916c4dff0000000000000002 to session cluster.
      2023-07-14 10:25:35,301 o.a.f.k.o.s.AbstractFlinkService [INFO ][aelps-staging/gorner-aepp-staging-e5730831] Submitting job: 103c0446e14749a10000000000000002 to session cluster.
      2023-07-14 10:25:35,302 o.a.f.k.o.s.AbstractFlinkService [INFO ][aelps-staging/gorner-session-staging-e5730831] Submitting job: de59304d370b4b8e0000000000000002 to session cluster.
      

      In the cluster the JarRunHandler's handleRequest() method will get the request, all 4 jobIDs are present (also all args, etc are correct):

      2023-07-14 10:25:35,320 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - handleRequest - requestBody.jobId: 4968b186061e44390000000000000002
      2023-07-14 10:25:35,321 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - handleRequest - requestBody.jobId: de59304d370b4b8e0000000000000002
      2023-07-14 10:25:35,321 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - handleRequest - requestBody.jobId: 91a5260d916c4dff0000000000000002
      2023-07-14 10:25:35,321 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - handleRequest - requestBody.jobId: 103c0446e14749a10000000000000002
      

      But once the EmbeddedExecutor's submitAndGetJobClientFuture() method is called instead of getting 1 call per jobID we have 4 calls but one of the jobIDs twice:

      2023-07-14 10:25:35,616 WARN  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - optJobId: Optional[4968b186061e44390000000000000002]
      2023-07-14 10:25:35,616 WARN  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - optJobId: Optional[103c0446e14749a10000000000000002]
      2023-07-14 10:25:35,616 WARN  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - optJobId: Optional[de59304d370b4b8e0000000000000002]
      2023-07-14 10:25:35,721 WARN  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - optJobId: Optional[de59304d370b4b8e0000000000000002]
      

      If this is important: the jobGraph obtained does not match the jobID. We get 2 times de59304d370b4b8e0000000000000002 but the jobgraph for this jobID is never returned by getJobGraph() in EmbeddedExecutor.submitAndGetJobClientFuture().

      This will then lead to the job already existing:

      2023-07-14 10:25:35,616 WARN  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - submittedJobIds: []
      2023-07-14 10:25:35,616 WARN  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - submittedJobIds: []
      2023-07-14 10:25:35,616 WARN  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - submittedJobIds: []
      2023-07-14 10:25:35,721 WARN  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - execute - submittedJobIds: [de59304d370b4b8e0000000000000002]
      

      But since the jobs are completely different the execution will fail. Depending on the timing with one of the following exceptions:

      • RestHandlerException: No jobs included in application
      • ClassNotFoundException: io.dectris.aelps.pipelines.gorner.facility.FacilityEventProcessor

       

      Attachments

        Issue Links

          Activity

            People

              chesnay Chesnay Schepler
              fabiowanner Fabio Wanner
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: