Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32394

Init.pos is partially stored in connector state

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 1.17.1
    • None
    • Connectors / Kinesis
    • None

    Description

      The Init.pos is partially stored in connector state which can lead to inconsistencies further down the line in idle streams. In particularly an issue arises when the init.pos is AT_TIMESTAMP, and the init.pos is later changed to TRIM_HORIZON.

      The issue is that AT_TIMESTAMP is stored in the connector state but the timestamp itself isn't stored in the state. If a stream is idle and the init.pos is changed to TRIM_HORIZON then the connector attempts to read from AT_TIMSTAMP (due to it being stored in state) attempting to get the timestamp from the properties however it is no longer there as the init.pos property is not TRIM_HORIZON.

      Sample error

       

      java.lang.IllegalArgumentException: java.lang.NullPointerException
          at org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.parseStreamTimestampStartingPosition(KinesisConfigUtil.java:579)
          at org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getStartingPosition(AWSUtil.java:325)
          at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createRecordPublisher(KinesisDataFetcher.java:495)
          at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createShardConsumer(KinesisDataFetcher.java:465)
          at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:592)
          at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:392)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
          at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
          at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
      Caused by: java.lang.NullPointerException
          at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1470)
          at java.base/java.text.DateFormat.parse(DateFormat.java:393)
          at org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.parseStreamTimestampStartingPosition(KinesisConfigUtil.java:577)
          ... 8 more
      
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            usamj Usamah Jassat
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: