Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-2374

AvroDFSSource does not use the overridden schema to deserialize Avro binaries.

    XMLWordPrintableJSON

Details

    Description

      Hi,

      I am not sure if the AvroDFSSource is intended to ignore the source schema from designated schema provider class, but the current logic always uses the Avro writer schema as reader schema.

       Logic as of release-0.9.0, Class: org.apache.hudi.utilities.sources.AvroDFSSource

      public class AvroDFSSource extends AvroSource {
      
        private final DFSPathSelector pathSelector;
      
        public AvroDFSSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
            SchemaProvider schemaProvider) throws IOException {
          super(props, sparkContext, sparkSession, schemaProvider);
          this.pathSelector = DFSPathSelector
              .createSourceSelector(props, sparkContext.hadoopConfiguration());
        }
      
        @Override
        protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCkptStr, long sourceLimit) {
          Pair<Option<String>, String> selectPathsWithMaxModificationTime =
              pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, lastCkptStr, sourceLimit);
          return selectPathsWithMaxModificationTime.getLeft()
              .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), selectPathsWithMaxModificationTime.getRight()))
              .orElseGet(() -> new InputBatch<>(Option.empty(), selectPathsWithMaxModificationTime.getRight()));
        }
      
        private JavaRDD<GenericRecord> fromFiles(String pathStr) {
          sparkContext.setJobGroup(this.getClass().getSimpleName(), "Fetch Avro data from files");
          JavaPairRDD<AvroKey, NullWritable> avroRDD = sparkContext.newAPIHadoopFile(pathStr, AvroKeyInputFormat.class,
              AvroKey.class, NullWritable.class, sparkContext.hadoopConfiguration());
          return avroRDD.keys().map(r -> ((GenericRecord) r.datum()));
        }
      }
      

      The schemaProvider parameter is completely ignored in the constructor, making AvroKeyInputFormat always use writer schema to read.

      As a result, we often see this from DeltaStream logs:

      21/08/30 10:17:24 WARN AvroKeyInputFormat: Reader schema was not set. Use AvroJob.setInputKeySchema() if desired.
      21/08/30 10:17:24 INFO AvroKeyInputFormat: Using a reader schema equal to the writer schema.
      

      This https://hudi.apache.org/blog/2021/08/16/kafka-custom-deserializer is a nice blog writing for AvroKafkaSource that supports BACKWARD_TRANSITIVE schema evolution. For DFS data, I see this is the main blocker. If we pass the source schema from schemaProvider, we should be able to have the same  BACKWARD_TRANSITIVE schema evolution feature for DFS avro data.

       

      Suggested Fix: Pass the source schema from schemaProvider to hadoop configuration key avro.schema.input.key

       

       

      Attachments

        Activity

          People

            harsh1231 Harshal Patil
            misurin Xuan Huy Pham
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - 48h
                48h
                Remaining:
                Remaining Estimate - 48h
                48h
                Logged:
                Time Spent - Not Specified
                Not Specified