Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-20088

[Kinesis][Polling] Issue using Polling consumer at timestamp with empty shard

    XMLWordPrintableJSON

Details

    Description

      Background

      The consumer fails when a Polling record publisher uses a timestamp sentinel starting position and the first record batch is empty. This is because the consumer tries to recalculate the start position from the timestamp sentinel, this operation is not supported.

      Reproduction Steps

      Setup an application consuming from Kinesis with following properties and consume from an empty shard:

      String format = "yyyy-MM-dd'T'HH:mm:ss";
      String date = new SimpleDateFormat(format).format(new Date());
      
      consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, date);
      consumerConfig.setProperty(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT, format);
      consumerConfig.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP"); 

      Error

      Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
      	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
      	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
      	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
      	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
      	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
      	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
      	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
      	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
      	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
      	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
      	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
      	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
      	at akka.dispatch.OnComplete.internal(Future.scala:264)
      	at akka.dispatch.OnComplete.internal(Future.scala:261)
      	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
      	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
      	at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
      	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
      	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
      	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
      	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
      	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
      	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
      	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
      	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
      	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
      	at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
      	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
      	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
      	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
      	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
      	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
      	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
      	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
      	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
      	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
      	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
      	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
      	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
      	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
      	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:534)
      	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
      	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
      	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
      	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
      	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
      	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
      	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
      	at akka.actor.Actor$class.aroundReceive$$$capture(Actor.scala:517)
      	at akka.actor.Actor$class.aroundReceive(Actor.scala)
      	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
      	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
      	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
      	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
      	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
      	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
      	... 4 more
      Caused by: java.lang.IllegalArgumentException: Unexpected sentinel type: AT_TIMESTAMP_SEQUENCE_NUM
      	at software.amazon.kinesis.connectors.flink.model.StartingPosition.fromSentinelSequenceNumber(StartingPosition.java:107)
      	at software.amazon.kinesis.connectors.flink.model.StartingPosition.fromSequenceNumber(StartingPosition.java:90)
      	at software.amazon.kinesis.connectors.flink.model.StartingPosition.continueFromSequenceNumber(StartingPosition.java:73)
      	at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:113)
      	at software.amazon.kinesis.connectors.flink.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:98)
      	at software.amazon.kinesis.connectors.flink.internals.ShardConsumer.run(ShardConsumer.java:108)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
      	at java.util.concurrent.FutureTask.run(FutureTask.java)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      

       

      Solution

      This is fixed by reusing the existing timestamp starting position in this condition.

      Attachments

        Issue Links

          Activity

            People

              danny.cranmer Danny Cranmer
              danny.cranmer Danny Cranmer
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: