Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
1.17.1
-
None
-
None
Description
The Init.pos is partially stored in connector state which can lead to inconsistencies further down the line in idle streams. In particularly an issue arises when the init.pos is AT_TIMESTAMP, and the init.pos is later changed to TRIM_HORIZON.
The issue is that AT_TIMESTAMP is stored in the connector state but the timestamp itself isn't stored in the state. If a stream is idle and the init.pos is changed to TRIM_HORIZON then the connector attempts to read from AT_TIMSTAMP (due to it being stored in state) attempting to get the timestamp from the properties however it is no longer there as the init.pos property is not TRIM_HORIZON.
Sample error
java.lang.IllegalArgumentException: java.lang.NullPointerException at org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.parseStreamTimestampStartingPosition(KinesisConfigUtil.java:579) at org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getStartingPosition(AWSUtil.java:325) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createRecordPublisher(KinesisDataFetcher.java:495) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.createShardConsumer(KinesisDataFetcher.java:465) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:592) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:392) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) Caused by: java.lang.NullPointerException at java.base/java.text.SimpleDateFormat.parse(SimpleDateFormat.java:1470) at java.base/java.text.DateFormat.parse(DateFormat.java:393) at org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.parseStreamTimestampStartingPosition(KinesisConfigUtil.java:577) ... 8 more