Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35324

Avro format can not perform projection pushdown for specific fields

    XMLWordPrintableJSON

Details

    Description

      AvroFormatFactory.java#createDecodingFormat would return a ProjectableDecodingFormat,which means avro format deserializer could perform the projection pushdown. However, it is found in practice that the Avro format seems unable to perform projection pushdown for specific fields. 

      For example, there are such schema and sample data in Kafka:

      -- schema
      CREATE TABLE kafka (
         `user_id` BIGINT,
         `name` STRING,
          `timestamp` TIMESTAMP(3) METADATA,
          `event_id` BIGINT,
          `payload` STRING not null
      ) WITH (
           'connector' = 'kafka',
           ...
      )
       
       -- sample data like    
      (3, 'name 3', TIMESTAMP '2020-03-10 13:12:11.123', 102, 'payload 3') 

      The data can be successfully deserialized in this way:

      Projection physicalProjections = Projection.of( new int[] {0,1,2} );
      
      DataType physicalFormatDataType = physicalProjections.project(this.physicalDataType);
      
      (DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
          .createRuntimeDecoder(context, this.physicalDataType, physicalProjections.toNestedIndexes()); 

      The data would be:

      +I(3,name 3,102) 

      However, when the projection index is replaced with values that do not start from 0, the data cannot be successfully deserialized, for example:

      Projection physicalProjections = Projection.of( new int[] {1,2} );
      
      DataType physicalFormatDataType = physicalProjections.project(this.physicalDataType);
      
      (DeserializationSchema<RowData>) ((ProjectableDecodingFormat) format)
          .createRuntimeDecoder(context, this.physicalDataType, physicalProjections.toNestedIndexes()); 

      The exception would be like:

      Caused by: java.lang.ArrayIndexOutOfBoundsException: -49
              at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
              at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
              at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
              at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
              at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
              at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
              at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
              at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
              at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
              at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
              at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
              ... 19 more 

      It seems that Avro format does not support projection pushdown for arbitrary fields. Is my understanding correct?

      If this is the case, then I think Avro format should not implement the ProjectableDecodingFormat interface , since it can only provide very limited pushdown capabilities.

      This problem may block the connector implementing the projection pushdown capability since the connector would determine whether projection pushdown can be performed by judging whether the format has implemented the ProjectableDecodingFormat interface or not.

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            sudewei.sdw SuDewei
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: