Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-8770

CompletedCheckPoints stored on ZooKeeper is not up-to-date, when JobManager is restarted it fails to recover the job due to "checkpoint FileNotFound exception"

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Cannot Reproduce
    • 1.4.0
    • None
    • Runtime / Task
    • None

    Description

      Hi, I am running a Flink cluster (1 JobManager + 6 TaskManagers) with HA mode on OpenShift, I have enabled Chaos Monkey which kills either JobManager or one of the TaskManager in every 5 minutes, ZooKeeper quorum is stable with no chaos monkey enabled. Flink reads data from one Kafka topic and writes data into another Kafka topic. Checkpoint surely is enabled, with 1000ms interval. state.checkpoints.num-retained is set to 10. I am using PVC for state backend (checkpoint, recovery, etc), so the checkpoints and states are persistent. 

      The restart strategy for Flink jobmanager DeploymentConfig is recreate, which means it will kill the old container of jobmanager before it restarts the new one.

      I have run the Chaos test for one day at first, however I have seen the exception:

      org.apache.flink.util.FlinkException: Could not retrieve checkpoint *** from state handle under /***. This indicates that the retrieved state handle is broken. Try cleaning the state handle store. and the root cause is checkpoint FileNotFound

      then the Flink job keeps restarting for a few hours and due to the above error it cannot be restarted successfully. 

      After further investigation, I have found the following facts in my PVC:


       

      rw-rr-. 1 flink root 11379 Feb 23 02:10 completedCheckpoint0ee95157de00
      rw-rr-. 1 flink root 11379 Feb 23 01:51 completedCheckpoint498d0952cf00
      rw-rr-. 1 flink root 11379 Feb 23 02:10 completedCheckpoint650fe5b021fe
      rw-rr-. 1 flink root 11379 Feb 23 02:10 completedCheckpoint66634149683e
      rw-rr-. 1 flink root 11379 Feb 23 02:11 completedCheckpoint67f24c3b018e
      rw-rr-. 1 flink root 11379 Feb 23 02:10 completedCheckpoint6f64ebf0ae64
      rw-rr-. 1 flink root 11379 Feb 23 02:11 completedCheckpoint906ebe1fb337
      rw-rr-. 1 flink root 11379 Feb 23 02:11 completedCheckpoint98b79ea14b09
      rw-rr-. 1 flink root 11379 Feb 23 02:10 completedCheckpointa0d1070e0b6c
      rw-rr-. 1 flink root 11379 Feb 23 02:11 completedCheckpointbd3a9ba50322
      rw-rr-. 1 flink root 11355 Feb 22 17:31 completedCheckpointd433b5e108be
      rw-rr-. 1 flink root 11379 Feb 22 22:56 completedCheckpointdd0183ed092b
      rw-rr-. 1 flink root 11379 Feb 22 00:00 completedCheckpointe0a5146c3d81
      rw-rr-. 1 flink root 11331 Feb 22 17:06 completedCheckpointec82f3ebc2ad
      rw-rr-. 1 flink root 11379 Feb 23 02:11 completedCheckpointf86e460f6720

       

      The latest 10 checkpoints are created at about 02:10, if you ignore the old checkpoints which were not deleted successfully (which I do not care too much).

       

      However when checking on ZooKeeper, I see the followings in flink/checkpoints path (I only list one, but the other 9 are similar)

      cZxid = 0x160001ff5d
      ��sr;org.apache.flink.runtime.state.RetrievableStreamStateHandle�U�+LwrappedStreamStateHandlet2Lorg/apache/flink/runtime/state/StreamStateHandle;xpsr9org.apache.flink.runtime.state.filesystem.FileStateHandle�u�b�▒▒J stateSizefilePathtLorg/apache/flink/core/fs/Path;xp,ssrorg.apache.flink.core.fs.PathLuritLjava/net/URI;xpsr
      java.net.URI�x.C�I�LstringtLjava/lang/String;xpt=file:/mnt/flink-test/recovery/completedCheckpointd004a3753870x
      [zk: localhost:2181(CONNECTED) 7] ctime = Fri Feb 23 02:08:18 UTC 2018
      mZxid = 0x160001ff5d
      mtime = Fri Feb 23 02:08:18 UTC 2018
      pZxid = 0x1d00000c6d
      cversion = 31
      dataVersion = 0
      aclVersion = 0
      ephemeralOwner = 0x0
      dataLength = 492

       

      so the latest completedCheckpoints status stored on ZooKeeper is at about 02:08, which implies that the completed checkpoints at
      02:10 somehow are not successfully submitted to ZooKpeer, so when it tries to restart the Flink job it is not able to find the latest checkpoint thus being failed.

      I am very suprised by this since seems writing checkpoint to zookeeper is synchronous , so I am not sure why this happens. Can anyone help looks at this ?

      Attachments

        Activity

          People

            Unassigned Unassigned
            gaoxinyang Xinyang Gao
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: