Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17666

take() or isEmpty() on dataset leaks s3a connections

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.0.0
    • 2.0.1, 2.1.0
    • SQL
    • None
    • ubuntu/centos, java 7, java 8, spark 2.0, java api

    Description

      I'm experiensing problems with s3a and working with parquet with dataset api
      the symptom of problem - tasks failing with

      Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
      	at org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
      	at org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
      	at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
      

      Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in CLOSE_WAIT state

      reproduction of problem:

      package com.test;
      
      import java.text.ParseException;
      
      import org.apache.spark.SparkConf;
      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.SparkSession;
      
      public class ConnectionLeakTest {
      	public static void main(String[] args) throws ParseException {
      		SparkConf sparkConf = new SparkConf();
      
      		sparkConf.setMaster("local[*]");
      		sparkConf.setAppName("Test");
      		sparkConf.set("spark.local.dir", "/tmp/spark");
      		sparkConf.set("spark.sql.shuffle.partitions", "2");
      
      		SparkSession session = SparkSession.builder().config(sparkConf).getOrCreate();
      //set your credentials to your bucket
      
      		for (int i = 0; i < 100; i++) {
      			Dataset<Row> df = session
      					.sqlContext()
      					.read()
      					.parquet("s3a://test/*");//contains multiple snappy compressed parquet files
      			if (df.rdd().isEmpty()) {//same problem with takeAsList().isEmpty()
      				System.out.println("Yes");
      			} else {
      				System.out.println("No");
      			}
      		}
      		System.out.println("Done");
      	}
      }
      

      so when program runs, you can jps for pid and do lsof -p <pid> | grep https
      and you'll see constant grow of CLOSE_WAITs

      Our way to bypass problem is to use count() == 0
      In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() doesn't produce problem too

      Attachments

        Issue Links

          Activity

            People

              joshrosen Josh Rosen
              igor.berman Igor Berman
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: