Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27798

ConvertToLocalRelation should tolerate expression reusing output object

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.4.3
    • 2.3.4, 2.4.4, 3.0.0
    • SQL

    Description

      Steps to reproduce:

      Create a local Dataset (at least two distinct rows) with a binary Avro field. Use the from_avro function to deserialize the binary into another column. Verify that all of the rows incorrectly have the same value.

      Here's a concrete example (using Spark 2.4.3). All it does is converts a list of TestPayload objects into binary using the defined avro schema, then tries to deserialize using from_avro with that same schema:

      import org.apache.avro.Schema
      import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecordBuilder}
      import org.apache.avro.io.EncoderFactory
      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.avro.from_avro
      import org.apache.spark.sql.functions.col
      
      import java.io.ByteArrayOutputStream
      
      object TestApp extends App {
        // Payload container
        case class TestEvent(payload: Array[Byte])
        // Deserialized Payload
        case class TestPayload(message: String)
        // Schema for Payload
        val simpleSchema =
          """
            |{
            |"type": "record",
            |"name" : "Payload",
            |"fields" : [ {"name" : "message", "type" : [ "string", "null" ] } ]
            |}
          """.stripMargin
        // Convert TestPayload into avro binary
        def generateSimpleSchemaBinary(record: TestPayload, avsc: String): Array[Byte] = {
          val schema = new Schema.Parser().parse(avsc)
          val out = new ByteArrayOutputStream()
          val writer = new GenericDatumWriter[GenericRecord](schema)
          val encoder = EncoderFactory.get().binaryEncoder(out, null)
          val rootRecord = new GenericRecordBuilder(schema).set("message", record.message).build()
          writer.write(rootRecord, encoder)
          encoder.flush()
          out.toByteArray
        }
      
        val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
        import spark.implicits._
        List(
          TestPayload("one"),
          TestPayload("two"),
          TestPayload("three"),
          TestPayload("four")
        ).map(payload => TestEvent(generateSimpleSchemaBinary(payload, simpleSchema)))
          .toDS()
          .withColumn("deserializedPayload", from_avro(col("payload"), simpleSchema))
          .show(truncate = false)
      }
      

      And here is what this program outputs:

      +----------------------+-------------------+
      |payload               |deserializedPayload|
      +----------------------+-------------------+
      |[00 06 6F 6E 65]      |[four]             |
      |[00 06 74 77 6F]      |[four]             |
      |[00 0A 74 68 72 65 65]|[four]             |
      |[00 08 66 6F 75 72]   |[four]             |
      +----------------------+-------------------+

      Here, we can see that the avro binary is correctly generated, but the deserialized version is a copy of the last row. I have not yet verified that this is an issue in cluster mode as well.

       

      I dug into a bit more of the code and it seems like the resuse of result in AvroDataToCatalyst is overwriting the decoded values of previous rows. I set a breakpoint in LocalRelation and the data sequence seem to all point to the same address in memory - and therefore a mutation in one variable will cause all of it to mutate.

      Attachments

        Issue Links

          Activity

            People

              viirya L. C. Hsieh
              ykmori15 Yosuke Mori
              Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: