Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25902

NullPointerException in RescalingITCase.testSavepointRescalingOutKeyedState

    XMLWordPrintableJSON

Details

    Description

      I experienced a build failure in RescalingITCase.testSavepointRescalingOutKeyedState with a NullPointerException appearing:

      12:17:36,702 [AsyncOperations-thread-1] INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Flat Map -> Sink: Unnamed (4/4)#0 - asynchronous part of checkpoint 5 could not be completed.
      java.util.concurrent.ExecutionException: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint was canceled because a barrier from newer checkpoint was received.
              at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_292]
              at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) ~[?:1.8.0_292]
              at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:69) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.finalizeNonFinishedSnapshots(AsyncCheckpointRunnable.java:191) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:124) [flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_292]
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_292]
              at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
      Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint was canceled because a barrier from newer checkpoint was received.
              at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.abortInternal(SingleCheckpointBarrierHandler.java:376) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.cancelSubsumedCheckpoint(SingleCheckpointBarrierHandler.java:463) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.checkNewCheckpoint(SingleCheckpointBarrierHandler.java:347) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:228) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.processPriorityEvents(CheckpointedInputGate.java:112) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              ... 1 more
      12:17:36,707 [Flat Map -> Sink: Unnamed (1/4)#0] INFO  org.apache.flink.state.changelog.ChangelogKeyedStateBackend  [] - snapshot of Flat Map -> Sink: Unnamed (1/4)#0 for checkpoint 6, change range: 0..13306
      12:17:36,710 [Flat Map -> Sink: Unnamed (4/4)#0] INFO  org.apache.flink.state.changelog.ChangelogKeyedStateBackend  [] - snapshot of Flat Map -> Sink: Unnamed (4/4)#0 for checkpoint 6, change range: 0..9002
      12:17:36,720 [Flat Map -> Sink: Unnamed (4/4)#0] INFO  org.apache.flink.state.changelog.PeriodicMaterializationManager [] - Shutting down PeriodicMaterializationManager.
      12:17:36,720 [AsyncOperations-thread-1] INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Flat Map -> Sink: Unnamed (4/4)#0 - asynchronous part of checkpoint 6 could not be completed.
      java.util.concurrent.CancellationException: null
              at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276) ~[?:1.8.0_292]
              at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:78) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.api.operators.OperatorSnapshotFutures.lambda$cancel$0(OperatorSnapshotFutures.java:173) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.shaded.guava30.com.google.common.io.Closer.close(Closer.java:213) ~[flink-shaded-guava-30.1.1-jre-14.0.jar:30.1.1-jre-14.0]
              at org.apache.flink.streaming.api.operators.OperatorSnapshotFutures.cancel(OperatorSnapshotFutures.java:185) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.cleanup(AsyncCheckpointRunnable.java:391) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.close(AsyncCheckpointRunnable.java:356) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:294) ~[flink-core-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:281) ~[flink-core-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.close(SubtaskCheckpointCoordinatorImpl.java:480) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:294) ~[flink-core-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:281) ~[flink-core-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.core.fs.CloseableRegistry.doClose(CloseableRegistry.java:74) ~[flink-core-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[flink-core-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254) ~[flink-core-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) ~[flink-core-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[flink-core-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:914) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:930) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:930) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
      12:17:36,722 [Channel state writer Flat Map -> Sink: Unnamed (4/4)#0] INFO  org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl [] - Flat Map -> Sink: Unnamed (4/4)#0 discarding 0 drained requests
      12:17:36,722 [Flat Map -> Sink: Unnamed (4/4)#0] INFO  org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl [] - Flat Map -> Sink: Unnamed (4/4)#0 discarding 1 drained requests
      12:17:36,723 [Flat Map -> Sink: Unnamed (4/4)#0] WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Flat Map -> Sink: Unnamed (4/4)#0 (185244134ae888669016a5f2b4282d26) switched from RUNNING to FAILED with failure cause: java.lang.Nu
      llPointerException
              at org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536)
              at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298)
              at org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383)
              at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132)
              at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158)
              at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406)
              at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350)
              at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
              at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751)
              at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
              at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
              at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
              at java.lang.Thread.run(Thread.java:748)12:17:36,723 [Flat Map -> Sink: Unnamed (4/4)#0] INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Flat Map -> Sink: Unnamed (4/4)#0 (185244134ae888669016a5f2b4282d26).
      12:17:36,727 [flink-akka.actor.default-dispatcher-8] INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Flat Map -> Sink: Unnamed (4/4)#0 185244134ae
      888669016a5f2b4282d26.
      12:17:36,728 [flink-akka.actor.default-dispatcher-8] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Flat Map -> Sink: Unnamed (4/4) (185244134ae888669016a5f2b4282d26) switched from RUNNING to FAILED on 04d463f4-6c87-415e-b8ad-dd4
      20195d7cf @ localhost (dataPort=40473).
      java.lang.NullPointerException: null
              at org.apache.flink.state.changelog.ChangelogKeyedStateBackend.notifyCheckpointAborted(ChangelogKeyedStateBackend.java:536) ~[flink-statebackend-changelog-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.notifyCheckpointAborted(StreamOperatorStateHandler.java:298) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.api.operators.AbstractStreamOperator.notifyCheckpointAborted(AbstractStreamOperator.java:383) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointAborted(AbstractUdfStreamOperator.java:132) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.notifyCheckpointAborted(RegularOperatorChain.java:158) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpoint(SubtaskCheckpointCoordinatorImpl.java:406) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointAborted(SubtaskCheckpointCoordinatorImpl.java:352) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointAbortAsync$15(StreamTask.java:1327) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$17(StreamTask.java:1350) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751) ~[flink-streaming-java-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
              at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]

      The branch on which that test is failing does work on checkpoint-related stuff in the sense that it recovers checkpoints (FLIP-194; JobResultStore efforts). But it does not reinstantiate the CheckpointCoordinator which leaves me with the suspicion that there's something else going wrong. 

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              mapohl Matthias Pohl
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: