Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
0.8.0
-
0.25
Description
Additional background information:
Spark version 3.1.1
Scala version 2.12
Hudi version 0.8.0 (hudi-spark-bundle_2.12 artifact)
While testing a spark job in EMR of inserting and then upserting data for a fairly complex nested case class structure, I ran into an issue that I was having a hard time tracking down. It seems when part of the case class in the dataframe to be written has a single field in it, the avro schema generation fails with the following stacktrace, but only on the upsert:
21/08/19 15:08:34 ERROR BoundedInMemoryExecutor: error producing records
org.apache.avro.SchemaParseException: Can't redefine: array
{{ at org.apache.avro.Schema$Names.put(Schema.java:1128) }}
{{ at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562) }}
{{ at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690) }}
{{ at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805) }}
{{ at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882) }}
{{ at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716) }}
{{ at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701)}}
{{ at org.apache.avro.Schema.toString(Schema.java:324)}}
{{ at org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68)}}
{{ at org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866)}}
{{ at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:475)}}
{{ at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)}}
{{ at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)}}
{{ at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)}}
{{ at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)}}
{{ at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)}}
{{ at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)}}
{{ at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)}}
{{ at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)}}
{{ at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)}}
{{ at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)}}
{{ at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)}}
{{ at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)}}
{{ at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)}}
{{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}}
{{ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
{{ at java.util.concurrent.FutureTask.run(FutureTask.java:266) }}
{{ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) }}
{{ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) }}
{{ at java.lang.Thread.run(Thread.java:748) }}
I am able to replicate the problem in my local IntelliJ setup using the test that has been attached to this issue. The problem can be observed in the DummyStepParent case class. Simply adding an additional field to the case class eliminates the problem altogether (which is an acceptable workaround for our purposes, but shouldn't ultimately be necessary).
case class DummyObject (
fieldOne: String,
listTwo: Seq[String],
listThree: Seq[DummyChild],
listFour: Seq[DummyStepChild],
fieldFive: Boolean,
listSix: Seq[DummyParent],
listSeven: Seq[DummyCousin],
listEight: Seq[DummyStepParent]
{{ )}}
case class DummyChild(childFieldOne: String, childFieldTwo: Int)
case class DummyStepChild(stepChildFieldOne: String, stepChildFieldTwo: Boolean)
case class DummyParent(children: Seq[DummyChild], stepChildren: Seq[DummyStepChild])
}}{{case class DummyStepParent(children: Seq[DummyChild])
case class DummyCousin(cousinFieldOne: String, cousinFieldTwo: Seq[DummyChild]){{}}