Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-1602

Corrupted Avro schema extracted from parquet file

    XMLWordPrintableJSON

Details

    • 1

    Description

      we are running a HUDI deltastreamer on a very complex stream. Schema is deeply nested, with several levels of hierarchy (avro schema is around 6600 LOC).

       

      The version of HUDI that writes the dataset if 0.5-SNAPTHOT and we recently started attempts to upgrade to the latest. Hovewer, latest HUDI can't read the provided dataset. Exception I get: 

       

       

      Got exception while parsing the arguments:Got exception while parsing the arguments:Found recursive reference in Avro schema, which can not be processed by Spark:{  "type" : "record""name" : "array""fields" : [ {    "name" : "id",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "type",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "exist",    "type" : [ "null", "boolean" ],    "default" : null  } ]}          Stack trace:org.apache.spark.sql.avro.IncompatibleSchemaException:Found recursive reference in Avro schema, which can not be processed by Spark:{  "type" : "record""name" : "array""fields" : [ {    "name" : "id",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "type",    "type" : [ "null", "string" ],    "default" : null  }, {    "name" : "exist",    "type" : [ "null", "boolean" ],    "default" : null  } ]}
       at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:75) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82) at org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81) at org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46) at org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56) at org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:67) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at com.css.dw.spark.SQLHudiOutputJob.run(SQLHudiOutputJob.java:118) at com.css.dw.spark.SQLHudiOutputJob.main(SQLHudiOutputJob.java:164) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
      

       

      I wrote a simple test that opens parquet file, loads schema, and attempts to convert it into avro and it does fail with the same error. It appears that Avro schema that looked like:

       

      {
                "name": "entity_path",
                "type": [
                  "null",
                  {
                    "type": "record",
                    "name": "MenuEntityPath",
                    "fields": [
                      {
                        "name": "path_nodes",
                        "type": [
                          "null",
                          {
                            "type": "array",
                            "items": {
                              "type": "record",
                              "name": "PathNode",
                              "namespace": "Menue_pathPath$",
                              "fields": [
                                {
                                  "name": "id",
                                  "type": [
                                    "null",
                                    {
                                      "type": "string",
                                      "avro.java.string": "String"
                                    }
                                  ],
                                  "default": null
                                },
                                {
                                  "name": "type",
                                  "type": [
                                    "null",
                                    {
                                      "type": "enum",
                                      "name": "MenuEntityType",
                                      "namespace": "shared",
                                      "symbols": [
                                        "UNKNOWN"
                                      ]
                                    }
                                  ],
                                  "default": null
                                }
                              ]
                            }
                          }
                        ],
                        "default": null
                      }
                    ]
                  }
                ],
                "default": null
              }
            ]
          }
        ],
        "default": null
      },

      Is converted into:

      [
        "null",
        {
          "type": "record",
          "name": "entity_path",
          "fields": [
            {
              "name": "path_nodes",
              "type": [
                "null",
                {
                  "type": "array",
                  "items": {
                    "type": "record",
                    "name": "array",
                    "fields": [
                      {
                        "name": "id",
                        "type": [
                          "null",
                          "string"
                        ],
                        "default": null
                      },
                      {
                        "name": "type",
                        "type": [
                          "null",
                          "string"
                        ],
                        "default": null
                      },
                      {
                        "name": "exist",
                        "type": [
                          "null",
                          "boolean"
                        ],
                        "default": null
                      }
                    ]
                  }
                }
              ],
              "default": null
            },
            {
              "name": "exist",
              "type": [
                "null",
                "boolean"
              ],
              "default": null
            }
          ]
        }
      ]

      A couple of questions: did anyone have similar issues and what is the best way forward?

       

      Edit: 

      I converted the dataset into pure parquet by using presto as an intermediary (create table as select). The result fails with a similar error, but in the different place:

       

      Found recursive reference in Avro schema, which can not be processed by Spark:
      {
        "type" : "record",
        "name" : "bag",
        "fields" : [ {
          "name" : "array_element",
          "type" : [ "null", {
            "type" : "record",
            "name" : "array_element",
            "fields" : [ {
              "name" : "id",

      it looks like the parquet writer replaces arrays with some synthetic records and gives them the same name.  

       

      Also, Spark reader works. I can open the parquet file directly by using:

      Dataset dataset = spark.read().parquet() 

      Attachments

        Issue Links

          Activity

            People

              codope Sagar Sumit
              afilipchik Alexander Filipchik
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: