Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
1.6.3, 2.0.2, 2.1.3, 2.2.3, 2.4.3
Description
Steps to reproduce:
Create a local Dataset (at least two distinct rows) with a binary Avro field. Use the from_avro function to deserialize the binary into another column. Verify that all of the rows incorrectly have the same value.
Here's a concrete example (using Spark 2.4.3). All it does is converts a list of TestPayload objects into binary using the defined avro schema, then tries to deserialize using from_avro with that same schema:
import org.apache.avro.Schema import org.apache.avro.generic.{GenericDatumWriter, GenericRecord, GenericRecordBuilder} import org.apache.avro.io.EncoderFactory import org.apache.spark.sql.SparkSession import org.apache.spark.sql.avro.from_avro import org.apache.spark.sql.functions.col import java.io.ByteArrayOutputStream object TestApp extends App { // Payload container case class TestEvent(payload: Array[Byte]) // Deserialized Payload case class TestPayload(message: String) // Schema for Payload val simpleSchema = """ |{ |"type": "record", |"name" : "Payload", |"fields" : [ {"name" : "message", "type" : [ "string", "null" ] } ] |} """.stripMargin // Convert TestPayload into avro binary def generateSimpleSchemaBinary(record: TestPayload, avsc: String): Array[Byte] = { val schema = new Schema.Parser().parse(avsc) val out = new ByteArrayOutputStream() val writer = new GenericDatumWriter[GenericRecord](schema) val encoder = EncoderFactory.get().binaryEncoder(out, null) val rootRecord = new GenericRecordBuilder(schema).set("message", record.message).build() writer.write(rootRecord, encoder) encoder.flush() out.toByteArray } val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() import spark.implicits._ List( TestPayload("one"), TestPayload("two"), TestPayload("three"), TestPayload("four") ).map(payload => TestEvent(generateSimpleSchemaBinary(payload, simpleSchema))) .toDS() .withColumn("deserializedPayload", from_avro(col("payload"), simpleSchema)) .show(truncate = false) }
And here is what this program outputs:
+----------------------+-------------------+ |payload |deserializedPayload| +----------------------+-------------------+ |[00 06 6F 6E 65] |[four] | |[00 06 74 77 6F] |[four] | |[00 0A 74 68 72 65 65]|[four] | |[00 08 66 6F 75 72] |[four] | +----------------------+-------------------+
Here, we can see that the avro binary is correctly generated, but the deserialized version is a copy of the last row. I have not yet verified that this is an issue in cluster mode as well.
I dug into a bit more of the code and it seems like the resuse of result in AvroDataToCatalyst is overwriting the decoded values of previous rows. I set a breakpoint in LocalRelation and the data sequence seem to all point to the same address in memory - and therefore a mutation in one variable will cause all of it to mutate.
Attachments
Attachments
Issue Links
- is duplicated by
-
SPARK-27027 from_avro function does not deserialize the Avro record of a struct column type correctly
- Resolved
- links to