Details
-
Bug
-
Status: Closed
-
Critical
-
Resolution: Fixed
-
None
-
None
-
1
Description
Problem:
When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 0`, in the continuous mode, the deltastreamer job may pick up the wrong checkpoint later on. The wrong checkpoint (for 20211206203551080 commit) happens after the replacecommit and clean, which is reset to "0", instead of "5" after 20211206202728233.commit. More details below.
The bug is due to the check here: https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335
if (cfg.checkpoint != null && (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)) || !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) { resumeCheckpointStr = Option.of(cfg.checkpoint); }
In this case of resuming after a clustering commit, "cfg.checkpoint != null" and "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))" are both true as "--checkpoint 0" is configured and last commit is replacecommit without checkpoint keys. This leads to the resume checkpoint string being reset to the configured checkpoint, skipping the timeline walk-back logic below, which is wrong.
Timeline:
189069 Dec 6 12:19 20211206201238649.commit 0 Dec 6 12:12 20211206201238649.commit.requested 0 Dec 6 12:12 20211206201238649.inflight 189069 Dec 6 12:27 20211206201959151.commit 0 Dec 6 12:20 20211206201959151.commit.requested 0 Dec 6 12:20 20211206201959151.inflight 189069 Dec 6 12:34 20211206202728233.commit 0 Dec 6 12:27 20211206202728233.commit.requested 0 Dec 6 12:27 20211206202728233.inflight 36662 Dec 6 12:35 20211206203449899.replacecommit 0 Dec 6 12:35 20211206203449899.replacecommit.inflight 34656 Dec 6 12:35 20211206203449899.replacecommit.requested 28013 Dec 6 12:35 20211206203503574.clean 19024 Dec 6 12:35 20211206203503574.clean.inflight 19024 Dec 6 12:35 20211206203503574.clean.requested 189069 Dec 6 12:43 20211206203551080.commit 0 Dec 6 12:35 20211206203551080.commit.requested 0 Dec 6 12:35 20211206203551080.inflight 189069 Dec 6 12:50 20211206204311612.commit 0 Dec 6 12:43 20211206204311612.commit.requested 0 Dec 6 12:43 20211206204311612.inflight 0 Dec 6 12:50 20211206205044595.commit.requested 0 Dec 6 12:50 20211206205044595.inflight 128 Dec 6 12:56 archived 483 Dec 6 11:52 hoodie.properties
Checkpoints in commits:
grep "deltastreamer.checkpoint.key" * 20211206201238649.commit: "deltastreamer.checkpoint.key" : "2" 20211206201959151.commit: "deltastreamer.checkpoint.key" : "3" 20211206202728233.commit: "deltastreamer.checkpoint.key" : "4" 20211206203551080.commit: "deltastreamer.checkpoint.key" : "1" 20211206204311612.commit: "deltastreamer.checkpoint.key" : "2"
Steps to reproduce:
Run HoodieDeltaStreamer in the continuous mode, by providing both "-checkpoint 0" and "-continuous", with inline clustering and sync clean enabled (some configs are masked).
spark-submit \ --master yarn \ --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 4 \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain \ --conf spark.speculation=true \ --conf spark.speculation.multiplier=1.0 \ --conf spark.speculation.quantile=0.5 \ --packages org.apache.spark:spark-avro_2.12:3.2.0 \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \ --props file:/home/hadoop/ethan/test.properties \ --source-class ... \ --source-ordering-field ts \ --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \ --target-table test_table \ --table-type COPY_ON_WRITE \ --op BULK_INSERT \ --checkpoint 0 \ --continuous
test.properties:
hoodie.cleaner.commits.retained=4 hoodie.keep.min.commits=5 hoodie.keep.max.commits=7 hoodie.clean.async=true hoodie.clustering.inline=true hoodie.clustering.async.max.commits=3 hoodie.compact.inline.max.delta.commits=3 hoodie.insert.shuffle.parallelism=10 hoodie.upsert.shuffle.parallelism=10 hoodie.bulk_insert.shuffle.parallelism=10 hoodie.delete.shuffle.parallelism=10 hoodie.bulkinsert.shuffle.parallelism=10 hoodie.datasource.write.recordkey.field=key hoodie.datasource.write.partitionpath.field=partition # turn off any small file handling, for ease of testing hoodie.parquet.small.file.limit=1 benchmark.input.source.path=...
Attachments
Issue Links
- links to