Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
1.6.0
-
None
-
None
Description
SPARK-9241 updated the optimizer to rewrite count distinct. That change uses a count that is no longer distinct because duplicates are eliminated further down in the plan. This caused the name of the column to change:
Spark 1.5.2
scala> Seq((1, "s")).toDF("a", "b").agg(countDistinct("a")) res0: org.apache.spark.sql.DataFrame = [COUNT(DISTINCT a): bigint] == Physical Plan == TungstenAggregate(key=[], functions=[(count(a#7),mode=Complete,isDistinct=true)], output=[COUNT(DISTINCT a)#9L]) TungstenAggregate(key=[a#7], functions=[], output=[a#7]) TungstenExchange SinglePartition TungstenAggregate(key=[a#7], functions=[], output=[a#7]) LocalTableScan [a#7], [[1]]
Spark 1.6.0
scala> Seq((1, "s")).toDF("a", "b").agg(countDistinct("a")) res0: org.apache.spark.sql.DataFrame = [count(a): bigint] == Physical Plan == TungstenAggregate(key=[], functions=[(count(if ((gid#35 = 1)) a#36 else null),mode=Final,isDistinct=false)], output=[count(a)#31L]) +- TungstenExchange SinglePartition, None +- TungstenAggregate(key=[], functions=[(count(if ((gid#35 = 1)) a#36 else null),mode=Partial,isDistinct=false)], output=[count#39L]) +- TungstenAggregate(key=[a#36,gid#35], functions=[], output=[a#36,gid#35]) +- TungstenExchange hashpartitioning(a#36,gid#35,500), None +- TungstenAggregate(key=[a#36,gid#35], functions=[], output=[a#36,gid#35]) +- Expand [List(a#29, 1)], [a#36,gid#35] +- LocalTableScan [a#29], [[1]]
This has broken jobs that used the generated name. For example, withColumnRenamed("COUNT(DISTINCT a)", "c").
I think that the previous generated name is correct, even though the plan has changed.
marmbrus, you may want to take a look. It looks like you reviewed SPARK-9241 and have some context here.
Attachments
Issue Links
- duplicates
-
SPARK-12593 Convert basic resolved logical plans back to SQL query strings
- Resolved