Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.8.0
-
None
-
Description
The following code runs smoothly when the executionEnvironment is instance of StreamExecutionEnvironment:
val filePaths = //some links to s3 files val job = Job.getInstance() AvroReadSupport.setAvroDataSupplier(job.getConfiguration, classOf[AvroDataSupplierWithTimestampConversion]) val avroParquetInputFormat = new AvroParquetInputFormat[GpsPointDTO]() val hadoopInputFormat = new HadoopInputFormat[Void, GpsPointDTO](avroParquetInputFormat, classOf[Void], classOf[GpsPointDTO], job) FileInputFormat.addInputPaths(job, filePaths.head) executionEnvironment.createInput(hadoopInputFormat).map(_._2).print()
But when the ExecutionEnvironment is used instead of StreamExecutionEnviroment, then the code throws the:
Caused by: java.lang.ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class com.company.GpsPointDTO (org.apache.avro.generic.GenericData$Record and com.company.GpsPointDTO are in unnamed module of loader 'app')
I don't think this is the expected behavior.
The code simply reads some files from S3. It has the AvroSupplierWithTimestampConversion which is used to add timestamp conversion to DateTime, GpsPointDTO is a class generated from Avro schema.
EDIT: It seems to work fine for Dataset API, if I remove the setting of avroDataSupplier. Obviously, in such case I needed to change Avro schema to use timestamp as long.