Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14617

Dataset Parquet ClassCastException for SpecificRecord

    XMLWordPrintableJSON

Details

    Description

      The following code runs smoothly when the executionEnvironment is instance of StreamExecutionEnvironment:

      val filePaths = //some links to s3 files
      val job = Job.getInstance()
      AvroReadSupport.setAvroDataSupplier(job.getConfiguration, classOf[AvroDataSupplierWithTimestampConversion])
      val avroParquetInputFormat = new AvroParquetInputFormat[GpsPointDTO]()
      val hadoopInputFormat = new HadoopInputFormat[Void, GpsPointDTO](avroParquetInputFormat, classOf[Void], classOf[GpsPointDTO], job)
      FileInputFormat.addInputPaths(job, filePaths.head)
      executionEnvironment.createInput(hadoopInputFormat).map(_._2).print()

      But when the ExecutionEnvironment is used instead of StreamExecutionEnviroment, then the code throws the: 

      Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class com.company.GpsPointDTO (org.apache.avro.generic.GenericData$Record and com.company.GpsPointDTO are in unnamed module of loader 'app')

      I don't think this is the expected behavior. 

      The code simply reads some files from S3. It has the AvroSupplierWithTimestampConversion which is used to add timestamp conversion to DateTimeGpsPointDTO is a class generated from Avro schema.

      EDIT: It seems to work fine for Dataset API, if I remove the setting of avroDataSupplier. Obviously, in such case I needed to change Avro schema to use timestamp as long.

      Attachments

        Activity

          People

            Unassigned Unassigned
            Wosinsan Dominik Wosiński
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: