Details
Description
I'm experiensing problems with s3a and working with parquet with dataset api
the symptom of problem - tasks failing with
Caused by: org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
at org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:232)
at org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
Checking CoarseGrainedExecutorBackend with lsof gave me many sockets in CLOSE_WAIT state
reproduction of problem:
package com.test; import java.text.ParseException; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class ConnectionLeakTest { public static void main(String[] args) throws ParseException { SparkConf sparkConf = new SparkConf(); sparkConf.setMaster("local[*]"); sparkConf.setAppName("Test"); sparkConf.set("spark.local.dir", "/tmp/spark"); sparkConf.set("spark.sql.shuffle.partitions", "2"); SparkSession session = SparkSession.builder().config(sparkConf).getOrCreate(); //set your credentials to your bucket for (int i = 0; i < 100; i++) { Dataset<Row> df = session .sqlContext() .read() .parquet("s3a://test/*");//contains multiple snappy compressed parquet files if (df.rdd().isEmpty()) {//same problem with takeAsList().isEmpty() System.out.println("Yes"); } else { System.out.println("No"); } } System.out.println("Done"); } }
so when program runs, you can jps for pid and do lsof -p <pid> | grep https
and you'll see constant grow of CLOSE_WAITs
Our way to bypass problem is to use count() == 0
In addition we've seen that df.dropDuplicates("a","b").rdd().isEmpty() doesn't produce problem too
Attachments
Issue Links
- relates to
-
SPARK-17740 Spark tests should mock / interpose HDFS to ensure that streams are closed
- Resolved
- links to