Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.2.0
-
None
-
OS: Ubuntu 20.04.3 LTS
Scala version: 2.12.12
Description
NOTE: this bug is only present in version 3.2.0. Downgrading to 3.1.2 solves the problem.
Prerequisites to reproduce the bug
- use Spark version 3.2.0
- create a DataFrame with an array field, which contains a struct field with a nested array field
- apply a limit to the DataFrame
- transform the outer array, renaming one of its fields
- transform the inner array too, which requires two getField in sequence
Example that reproduces the bug
This is a minimal example (as minimal as I could make it) to reproduce the bug:
import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row} def makeInput(): DataFrame = { val innerElement1 = Row(3, 3.12) val innerElement2 = Row(4, 2.1) val innerElement3 = Row(1, 985.2) val innerElement4 = Row(10, 757548.0) val innerElement5 = Row(1223, 0.665) val outerElement1 = Row(1, Row(List(innerElement1, innerElement2))) val outerElement2 = Row(2, Row(List(innerElement3))) val outerElement3 = Row(3, Row(List(innerElement4, innerElement5))) val data = Seq( Row("row1", List(outerElement1)), Row("row2", List(outerElement2, outerElement3)), ) val schema = new StructType() .add("name", StringType) .add("outer_array", ArrayType(new StructType() .add("id", IntegerType) .add("inner_array_struct", new StructType() .add("inner_array", ArrayType(new StructType() .add("id", IntegerType) .add("value", DoubleType) )) ) )) spark.createDataFrame(spark.sparkContext .parallelize(data),schema) } // val df = makeInput() val df = makeInput().limit(2) // val df = makeInput().limit(2).cache() val res = df.withColumn("extracted", transform( col("outer_array"), c1 => { struct( c1.getField("id").alias("outer_id"), transform( c1.getField("inner_array_struct").getField("inner_array"), c2 => { struct( c2.getField("value").alias("inner_value") ) } ) ) } )) res.printSchema() res.show(false)
Executing the example code
When executing it as-is, the execution will fail on the show statement, with
java.lang.IllegalStateException Couldn't find _extract_inner_array#23 in [name#2,outer_array#3]
However, if the limit is not applied, or if the DataFrame is cached after the limit, everything works (you can uncomment the corresponding lines in the example to try it).