Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15152

Job running without periodic checkpoint for stop failed at the beginning

    XMLWordPrintableJSON

Details

    Description

      I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file.

      Reproduce the problem:

      1. Job was submitted to YARN:

      bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar

      2. Then immediately, before all the task switch to RUNNING (about seconds), I(actually a job control script) send a "stop with savepoint" command by flink cli:

      bin/flink stop -yid application_1575872737452_0019 f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
      

      log in jobmanager.log:

      2019-12-09 17:56:56,512 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Socket Stream -> Map (1/1) of job f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
      

      Then the job task(taskmanager) continues to run normally without checkpoint.

      The cause of the problem:

      1. "stop with savepoint" command call the code stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) and then triggerSynchronousSavepoint:

      // we stop the checkpoint coordinator so that we are guaranteed
      // to have only the data of the synchronous savepoint committed.
      // in case of failure, and if the job restarts, the coordinator
      // will be restarted by the CheckpointCoordinatorDeActivator.
      checkpointCoordinator.stopCheckpointScheduler();

      2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509

      LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
        tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
        job,
        ExecutionState.RUNNING,
        ee.getState());
      throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);

      3. finally, "stop with savepoint" failed, with "checkpointCoordinator.stopCheckpointScheduler()" but without the termination of the job. The job is still running without periodically checkpoint. 

       

      sample code for reproduce:

      public class StreamingJob {
      
        private static StateBackend makeRocksdbBackend() throws IOException {
          RocksDBStateBackend rocksdbBackend = new RocksDBStateBackend("file:///tmp/aaa");
          rocksdbBackend.enableTtlCompactionFilter();
          rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
          return rocksdbBackend;
        }
      
        public static void main(String[] args) throws Exception {
          // set up the streaming execution environment
          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
          // 10 sec
          env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
          env.setStateBackend(makeRocksdbBackend());
          env.setRestartStrategy(RestartStrategies.noRestart());
      
          CheckpointConfig checkpointConfig = env.getCheckpointConfig();
          checkpointConfig.enableExternalizedCheckpoints(
              CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
          checkpointConfig.setFailOnCheckpointingErrors(true);
      
          DataStream<String> text = env.socketTextStream("127.0.0.1", 8912, "\n");
          text.map(new MapFunction<String, Tuple2<Long, Long>>() {
            @Override
            public Tuple2<Long, Long> map(String s) {
              String[] s1 = s.split(" ");
              return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
            }
          }).keyBy(0).flatMap(new CountWindowAverage()).print();
      
          env.execute("Flink Streaming Java API Skeleton");
        }
      
        public static class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
      
          private transient ValueState<Tuple2<Long, Long>> sum;
      
          @Override
          public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
            Tuple2<Long, Long> currentSum = sum.value();
            currentSum.f0 += 1;
            currentSum.f1 += input.f1;
            sum.update(currentSum);
            out.collect(new Tuple2<>(input.f0, currentSum.f1));
          }
      
          @Override
          public void open(Configuration config) {
            ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                    "average", // the state name
                    TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
                    }), // type information
                    Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
            sum = getRuntimeContext().getState(descriptor);
          }
        }
      }
      

      Attachments

        Issue Links

          Activity

            People

              klion26 Congxian Qiu
              fengjiajie Feng Jiajie
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m