Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37855

IllegalStateException when transforming an array inside a nested struct

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.2.0
    • 3.2.1, 3.3.0
    • Spark Core
    • None
    • OS: Ubuntu 20.04.3 LTS

      Scala version: 2.12.12

       

    Description

      NOTE: this bug is only present in version 3.2.0. Downgrading to 3.1.2 solves the problem.

      Prerequisites to reproduce the bug

      1. use Spark version 3.2.0
      2. create a DataFrame with an array field, which contains a struct field with a nested array field
      3. apply a limit to the DataFrame
      4. transform the outer array, renaming one of its fields
      5. transform the inner array too, which requires two getField in sequence

      Example that reproduces the bug

      This is a minimal example (as minimal as I could make it) to reproduce the bug:

      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.{DataFrame, Row}
      
      def makeInput(): DataFrame = {
          val innerElement1 = Row(3, 3.12)
          val innerElement2 = Row(4, 2.1)
          val innerElement3 = Row(1, 985.2)
          val innerElement4 = Row(10, 757548.0)
          val innerElement5 = Row(1223, 0.665)
      
          val outerElement1 = Row(1, Row(List(innerElement1, innerElement2)))
          val outerElement2 = Row(2, Row(List(innerElement3)))
          val outerElement3 = Row(3, Row(List(innerElement4, innerElement5)))
      
          val data = Seq(
              Row("row1", List(outerElement1)),
              Row("row2", List(outerElement2, outerElement3)),
          )
      
          val schema = new StructType()
              .add("name", StringType)
              .add("outer_array", ArrayType(new StructType()
                  .add("id", IntegerType)
                  .add("inner_array_struct", new StructType()
                      .add("inner_array", ArrayType(new StructType()
                          .add("id", IntegerType)
                          .add("value", DoubleType)
                      ))
                  )
              ))
      
          spark.createDataFrame(spark.sparkContext
              .parallelize(data),schema)
      }
      
      // val df = makeInput()
      val df = makeInput().limit(2)
      // val df = makeInput().limit(2).cache()
      
      val res = df.withColumn("extracted", transform(
          col("outer_array"),
          c1 => {
              struct(
                  c1.getField("id").alias("outer_id"),
                  transform(
                      c1.getField("inner_array_struct").getField("inner_array"),
                      c2 => {
                          struct(
                              c2.getField("value").alias("inner_value")
                          )
                      }
                  )
              )
          }
      ))
      
      res.printSchema()
      res.show(false)
      

      Executing the example code

      When executing it as-is, the execution will fail on the show statement, with

      java.lang.IllegalStateException Couldn't find _extract_inner_array#23 in [name#2,outer_array#3]
      

      However, if the limit is not applied, or if the DataFrame is cached after the limit, everything works (you can uncomment the corresponding lines in the example to try it).

      Attachments

        Activity

          People

            ulysses XiDuo You
            gmuciaccia G Muciaccia
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: