Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Not A Problem
-
0.6.0
-
None
-
None
-
AWS EMR emr-5.32.0 , spark version 2.4.7
Description
Exception:
java.lang.NullPointerException at org.apache.hudi.common.util.ParquetUtils.fetchRecordKeyPartitionPathFromParquet(ParquetUtils.java:146) at org.apache.hudi.io.HoodieKeyLocationFetchHandle.locations(HoodieKeyLocationFetchHandle.java:53) at org.apache.hudi.index.simple.HoodieSimpleIndex.lambda$fetchRecordLocations$c57f549b$1(HoodieSimpleIndex.java:179) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:143) at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:143) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:188) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
We are using hudi as our storage engine for the output of our Spark jobs. We use AWS EMR to run the jobs. Recently we started observing that some of the upsert commits are leaving the table in an inconsistent state i.e. _hoodie_record_key is observed to be null for a record which is updated during that commit.
How are we checking that _hoodie_record_key is null?
val df = spark.read .format("org.apache.hudi") .load("s3://myLocation/my-table" + "/*/*/*/*") df.filter($"_hoodie_record_key".isNull).show(false) // Output +------------------+----------------------+--------------------------+ |_hoodie_record_key|_hoodie_partition_path|primaryKey | +------------------+----------------------+--------------------------+ |null |2021/07/01 |xxxxxxxxxxxxxxxxxxxxxx | +------------------+----------------------+--------------------------+
One thing to note here is that the record which has null for _hoodie_record_key was already present in the hudi table and was updated during the commit
What is even weird for us is that there is only a single record in the hudi table with _hoodie_record_key as null, and all other records are fine
We have verified that the column that is used as _hoodie_record_key (RECORDKEY_FIELD_OPT_KEY) is present in the record and is NOT NULL
After rolling back the faulty commit which introduced that record, rerunning the job works fine .i.e there are no records with _hoodie_record_key null
HoodieWriter Config
__
val hudiOptions = Map[String, String]( RECORDKEY_FIELD_OPT_KEY -> "primaryKey", PARTITIONPATH_FIELD_OPT_KEY -> "partitionKey", PRECOMBINE_FIELD_OPT_KEY -> "updateTime", KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getName, CLEANER_COMMITS_RETAINED_PROP -> "5" )dataframe.write.format("org.apache.hudi") .option(HoodieWriteConfig.TABLE_NAME, "myTable") .options(hudiOptions) .option(HoodieIndexConfig.INDEX_TYPE_PROP,"SIMPLE") .mode(SaveMode.Append) .save("s3://mylocation/")
We are using a custom RecordPayload class which inherits from OverwriteWithLatestAvroPayload