Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27228

Spark long delay on close, possible problem with killing executors

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.0
    • None
    • Block Manager, Spark Core

    Description

      When using dynamic allocations after all jobs finishes spark delays for several minutes before finally finishes. Log suggest that executors are not cleared up properly.

      See the attachment for log

       

      Attachments

        1. log.html
          57 kB
          Lukas Waldmann

        Activity

          sandeep.katta2007 Sandeep Katta added a comment -

          can you upload the log file?

          sandeep.katta2007 Sandeep Katta added a comment - can you upload the log file?
          luky Lukas Waldmann added a comment -

          log file added

          luky Lukas Waldmann added a comment - log file added
          luky Lukas Waldmann added a comment -

          Startup parameters:

          spark-submit --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true --conf spark.driver.maxResultSize=4g --executor-memory 4g --driver-memory 8g --master yarn --deploy-mode cluster

          luky Lukas Waldmann added a comment - Startup parameters: spark-submit --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.enabled=true --conf spark.driver.maxResultSize=4g --executor-memory 4g --driver-memory 8g --master yarn --deploy-mode cluster
          gurwls223 Hyukjin Kwon added a comment - - edited

          Please just don't copy and paste the log without any further analysis (see https://spark.apache.org/contributing.html) if you're not going to investigate further and submit a PR. I am leaving this resolved but please reopen when the problem is narrowed down peroperly or PR is ready.

          gurwls223 Hyukjin Kwon added a comment - - edited Please just don't copy and paste the log without any further analysis (see https://spark.apache.org/contributing.html ) if you're not going to investigate further and submit a PR. I am leaving this resolved but please reopen when the problem is narrowed down peroperly or PR is ready.
          luky Lukas Waldmann added a comment -

          Executors management seems to behave strangely.

          After calling spark.stop() 

          See this:

           

          19/03/21 09:51:39 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 332.
          19/03/21 09:51:39 INFO DAGScheduler: Executor lost: 332 (epoch 446)
          19/03/21 09:51:39 INFO BlockManagerMasterEndpoint: Trying to remove executor 332 from BlockManagerMaster.
          19/03/21 09:51:39 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(332, data-10.bdp.gin.merck.com, 38713, None)
          19/03/21 09:51:39 INFO BlockManagerMaster: Removed 332 successfully in removeExecutor

          and few minutes later:

          19/03/21 09:54:26 WARN HeartbeatReceiver: Removing executor 332 with no recent heartbeats: 173942 ms exceeds timeout 120000 ms
          19/03/21 09:54:26 ERROR YarnClusterScheduler: Lost an executor 332 (already removed): Executor heartbeat timed out after 173942 ms

          19/03/21 09:54:26 INFO YarnClusterSchedulerBackend: Requesting to kill executor(s) 332
          19/03/21 09:54:26 WARN YarnClusterSchedulerBackend: Executor to kill 332 does not exist!
          19/03/21 09:54:26 INFO YarnClusterSchedulerBackend: Actual list of executor(s) to be killed is

           

           

          luky Lukas Waldmann added a comment - Executors management seems to behave strangely. After calling spark.stop()  See this:   19/03/21 09:51:39 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 332. 19/03/21 09:51:39 INFO DAGScheduler: Executor lost: 332 (epoch 446) 19/03/21 09:51:39 INFO BlockManagerMasterEndpoint: Trying to remove executor 332 from BlockManagerMaster. 19/03/21 09:51:39 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(332, data-10.bdp.gin.merck.com, 38713, None) 19/03/21 09:51:39 INFO BlockManagerMaster: Removed 332 successfully in removeExecutor and few minutes later: 19/03/21 09:54:26 WARN HeartbeatReceiver: Removing executor 332 with no recent heartbeats: 173942 ms exceeds timeout 120000 ms 19/03/21 09:54:26 ERROR YarnClusterScheduler: Lost an executor 332 (already removed): Executor heartbeat timed out after 173942 ms 19/03/21 09:54:26 INFO YarnClusterSchedulerBackend: Requesting to kill executor(s) 332 19/03/21 09:54:26 WARN YarnClusterSchedulerBackend: Executor to kill 332 does not exist! 19/03/21 09:54:26 INFO YarnClusterSchedulerBackend: Actual list of executor(s) to be killed is    
          luky Lukas Waldmann added a comment -

          Added initial issue investigation

          luky Lukas Waldmann added a comment - Added initial issue investigation
          gurwls223 Hyukjin Kwon added a comment - - edited

          How did you reproduce this problem, including the codes to reproduce? Otherwise, other people can't start to investigate.

          gurwls223 Hyukjin Kwon added a comment - - edited How did you reproduce this problem, including the codes to reproduce? Otherwise, other people can't start to investigate.
          luky Lukas Waldmann added a comment -

          That's a good question

          What code does it runs up to several hundreds  of sql queries with different parameters and union the results before writing the result to Hive table.

          Input are Hive tables with up to several hundreds million lines

          code looks something like this:

          void process(String dbName, String environment) {
                  //For all items in call the sql snippet and union the results
                  List<Metadata> mds = ...;
                  Map<String, Dataset> res = new LinkedHashMap<>();
                  mds.stream()
                      .forEach(md -> {
                              try (InputStream is = getClass().getResourceAsStream("/" + md.query_id)) {
                                  String snippet = IOUtils.toString(is);
                                  Dataset ds = spark.sql(snippet);
                                  String key = md.product;
                                  res.put(key, res.get(key) == null ?  ds : ds.union(res.get(key)));
                              } catch (IOException ex) {
                                  Logger.getLogger(SparkMainApp.class.getName()).log(Level.SEVERE, null, ex);
                              }
                          });
                     
          
                  String name = dbName + "." + table;        
                  res.values().stream()
                      .forEach( result -> {
                          result.repartition( result.col(PRODUCT.toString()), result.col(PROTOCOL.toString())).write()
                              .mode(SaveMode.Overwrite).insertInto(name);
                      }
                  );
              }
          
          luky Lukas Waldmann added a comment - That's a good question What code does it runs up to several hundreds  of sql queries with different parameters and union the results before writing the result to Hive table. Input are Hive tables with up to several hundreds million lines code looks something like this: void process( String dbName, String environment) { //For all items in call the sql snippet and union the results List<Metadata> mds = ...; Map< String , Dataset> res = new LinkedHashMap<>(); mds.stream() .forEach(md -> { try (InputStream is = getClass().getResourceAsStream( "/" + md.query_id)) { String snippet = IOUtils.toString(is); Dataset ds = spark.sql(snippet); String key = md.product; res.put(key, res.get(key) == null ? ds : ds.union(res.get(key))); } catch (IOException ex) { Logger.getLogger(SparkMainApp. class. getName()).log(Level.SEVERE, null , ex); } }); String name = dbName + "." + table; res.values().stream() .forEach( result -> { result.repartition( result.col(PRODUCT.toString()), result.col(PROTOCOL.toString())).write() .mode(SaveMode.Overwrite).insertInto(name); } ); }
          srowen Sean R. Owen added a comment -

          (Please don't reopen unless the info or discussion has materially changed.)
          I'm also not sure what's going on here, but 2.3.0 is fairly old now, and I can imagine a number of fixes that could be relevant since then. You'd need to reproduce against something newer, as 2.3.x will soon be EOL. I don't think anyone would investigate further as is.
          It could also be an env problem between Spark and YARN, etc.

          srowen Sean R. Owen added a comment - (Please don't reopen unless the info or discussion has materially changed.) I'm also not sure what's going on here, but 2.3.0 is fairly old now, and I can imagine a number of fixes that could be relevant since then. You'd need to reproduce against something newer, as 2.3.x will soon be EOL. I don't think anyone would investigate further as is. It could also be an env problem between Spark and YARN, etc.
          luky Lukas Waldmann added a comment -

          I understand.

          Unfortunately, things as they are, I am afraid I will be stuck with 2.3 for quite some time. Upgrade cycle of our cluster is in years rather than months

           

          luky Lukas Waldmann added a comment - I understand. Unfortunately, things as they are, I am afraid I will be stuck with 2.3 for quite some time. Upgrade cycle of our cluster is in years rather than months  

          People

            Unassigned Unassigned
            luky Lukas Waldmann
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: