Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Duplicate
-
1.5.1
-
None
-
Stand alone Cluster of five servers (happens as well in local mode). sqlContext instance of HiveContext (happens as well with SQLContext)
No special options other than driver memory and executor memory.
Parquet partitions are 512 where there are 160 cores. Happens as well with other partitioning
Data is nearly 2 billion rows.Stand alone Cluster of five servers (happens as well in local mode). sqlContext instance of HiveContext (happens as well with SQLContext) No special options other than driver memory and executor memory. Parquet partitions are 512 where there are 160 cores. Happens as well with other partitioning Data is nearly 2 billion rows.
Description
ONLY HAPPENS WHEN PERSIST() IS CALLED
val data = sqlContext.read.parquet("/var/data/Saif/ppnr_pqt")
data.groupBy("vintages").count.select("vintages").filter("vintages = '2007-01-01'").first
>>> res9: org.apache.spark.sql.Row = [2007-01-01]
data.groupBy("vintages").count.persist.select("vintages").filter("vintages = '2007-01-01'").first
>>> Exception on empty iterator stuff
This does not happen if using another type of field, eg IntType
data.groupBy("yyyymm").count.persist.select("yyyymm").filter("yyyymm = 200805").first >>> res13: org.apache.spark.sql.Row = [200805]
NOTE: I have no idea whether I used KRYO serializer when stored this parquet.
NOTE2: If setting the persist after the filtering, it works fine. But this is not a good enough workaround since any filter operation afterwards will break results.
NOTE3: I have reproduced the issue with several different datasets.
NOTE4: The only real workaround is to store the groupBy dataframe in database and reload it as a new dataframe.
Query to raw-data works fine:
data.select("vintages").filter("vintages = '2007-01-01'").first >>> res4: org.apache.spark.sql.Row = [2007-01-01]
Originally, the issue happened with a larger aggregation operation, the result was that data was inconsistent bringing different results every call.
Reducing the operation step by step, I got into this issue.
In any case, the original operation was:
val data = sqlContext.read.parquet("/var/Saif/data_pqt")
val res = data.groupBy("product", "band", "age", "vint", "mb", "yyyymm").agg(count($"account_id").as("N"), sum($"balance").as("balance_eom"), sum($"balancec").as("balance_eoc"), sum($"spend").as("spend"), sum($"payment").as("payment"), sum($"feoc").as("feoc"), sum($"cfintbal").as("cfintbal"), count($"newacct" === 1).as("newacct")).persist()
val z = res.select("vint", "yyyymm").filter("vint = '2007-01-01'").select("yyyymm").distinct.collect
z.length
>>> res0: Int = 102
res.unpersist()
val z = res.select("vint", "yyyymm").filter("vint = '2007-01-01'").select("yyyymm").distinct.collect
z.length
>>> res1: Int = 103
Attachments
Attachments
Issue Links
- is duplicated by
-
SPARK-10859 Predicates pushed to InmemoryColumnarTableScan are not evaluated correctly
- Resolved