Details
-
Bug
-
Status: Open
-
Blocker
-
Resolution: Unresolved
-
1.17.0
-
None
-
None
Description
AvroFormatFactory.java#createDecodingFormat would return a ProjectableDecodingFormat,which means avro format deserializer could perform the projection pushdown. However, it is found in practice that the Avro format seems unable to perform projection pushdown for specific fields.
For example, there are such schema and sample data in Kafka:
-- schema CREATE TABLE kafka ( `user_id` BIGINT, `name` STRING, `timestamp` TIMESTAMP(3) METADATA, `event_id` BIGINT, `payload` STRING not null ) WITH ( 'connector' = 'kafka', ... ) -- sample data like (3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 'payload 3')
The data can be successfully deserialized in this way:
Projection physicalProjections = Projection.of( new int[] {0,1,2} ); DataType physicalFormatDataType = physicalProjections.project(this.physicalDataType); (DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format) .createRuntimeDecoder(context, this.physicalDataType, physicalProjections.toNestedIndexes());
The data would be:
+I(3,name 3,102)
However, when the projection index is replaced with values that do not start from 0, the data cannot be successfully deserialized, for example:
Projection physicalProjections = Projection.of( new int[] {1,2} ); DataType physicalFormatDataType = physicalProjections.project(this.physicalDataType); (DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format) .createRuntimeDecoder(context, this.physicalDataType, physicalProjections.toNestedIndexes());
The exception would be like:
Caused by: java.lang.ArrayIndexOutOfBoundsException: -49 at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142) at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) ... 19 more
It seems that Avro format does not support projection pushdown for arbitrary fields. Is my understanding correct?
If this is the case, then I think Avro format should not implement the ProjectableDecodingFormat interface , since it can only provide very limited pushdown capabilities.
This problem may block the connector implementing the projection pushdown capability since the connector would determine whether projection pushdown can be performed by judging whether the format has implemented the ProjectableDecodingFormat interface or not.