Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-2947

HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

    XMLWordPrintableJSON

Details

    • 1

    Description

      Problem:

      When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 0`, in the continuous mode, the deltastreamer job may pick up the wrong checkpoint later on.  The wrong checkpoint (for 20211206203551080 commit) happens after the replacecommit and clean, which is reset to "0", instead of "5" after 20211206202728233.commit.  More details below.

       

      The bug is due to the check here: https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335

      if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))                || !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
          resumeCheckpointStr = Option.of(cfg.checkpoint);
      } 

      In this case of resuming after a clustering commit, "cfg.checkpoint != null" and "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))"  are both true as "--checkpoint 0" is configured and last commit is replacecommit without checkpoint keys.  This leads to the resume checkpoint string being reset to the configured checkpoint, skipping the timeline walk-back logic below, which is wrong.  

       

      Timeline:

       

       189069 Dec  6 12:19 20211206201238649.commit
            0 Dec  6 12:12 20211206201238649.commit.requested
            0 Dec  6 12:12 20211206201238649.inflight
       189069 Dec  6 12:27 20211206201959151.commit
            0 Dec  6 12:20 20211206201959151.commit.requested
            0 Dec  6 12:20 20211206201959151.inflight
       189069 Dec  6 12:34 20211206202728233.commit
            0 Dec  6 12:27 20211206202728233.commit.requested
            0 Dec  6 12:27 20211206202728233.inflight
        36662 Dec  6 12:35 20211206203449899.replacecommit
            0 Dec  6 12:35 20211206203449899.replacecommit.inflight
        34656 Dec  6 12:35 20211206203449899.replacecommit.requested
        28013 Dec  6 12:35 20211206203503574.clean
        19024 Dec  6 12:35 20211206203503574.clean.inflight
        19024 Dec  6 12:35 20211206203503574.clean.requested
       189069 Dec  6 12:43 20211206203551080.commit
            0 Dec  6 12:35 20211206203551080.commit.requested
            0 Dec  6 12:35 20211206203551080.inflight
       189069 Dec  6 12:50 20211206204311612.commit
            0 Dec  6 12:43 20211206204311612.commit.requested
            0 Dec  6 12:43 20211206204311612.inflight
            0 Dec  6 12:50 20211206205044595.commit.requested
            0 Dec  6 12:50 20211206205044595.inflight
          128 Dec  6 12:56 archived
          483 Dec  6 11:52 hoodie.properties
       

       

      Checkpoints in commits:

       

      grep "deltastreamer.checkpoint.key" *
      20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
      20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
      20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
      20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
      20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" 

       

      Steps to reproduce:

      Run HoodieDeltaStreamer in the continuous mode, by providing both "-checkpoint 0" and "-continuous", with inline clustering and sync clean enabled (some configs are masked).

       

      spark-submit \
        --master yarn \
        --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 4 \
        --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
        --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \
        --conf spark.speculation=true \
        --conf spark.speculation.multiplier=1.0 \
        --conf spark.speculation.quantile=0.5 \
        --packages org.apache.spark:spark-avro_2.12:3.2.0 \
        --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
        file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
        --props file:/home/hadoop/ethan/test.properties \
        --source-class ... \
        --source-ordering-field ts \
        --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
        --target-table test_table \
        --table-type COPY_ON_WRITE \
        --op BULK_INSERT \
        --checkpoint 0 \
        --continuous 

      test.properties:

       

       

      hoodie.cleaner.commits.retained=4
      hoodie.keep.min.commits=5
      hoodie.keep.max.commits=7
      
      
      hoodie.clean.async=true
      hoodie.clustering.inline=true
      hoodie.clustering.async.max.commits=3
      hoodie.compact.inline.max.delta.commits=3
      
      
      hoodie.insert.shuffle.parallelism=10
      hoodie.upsert.shuffle.parallelism=10
      hoodie.bulk_insert.shuffle.parallelism=10
      hoodie.delete.shuffle.parallelism=10
      hoodie.bulkinsert.shuffle.parallelism=10
      
      
      hoodie.datasource.write.recordkey.field=key
      hoodie.datasource.write.partitionpath.field=partition
      
      
      # turn off any small file handling, for ease of testing
      hoodie.parquet.small.file.limit=1
      
      benchmark.input.source.path=...

       

       

      Attachments

        Issue Links

          Activity

            People

              shivnarayan sivabalan narayanan
              guoyihua Ethan Guo (this is the old account; please use "yihua")
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: