Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-2172

upserts failing due to _hoodie_record_key being null in the hudi table

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Not A Problem
    • 0.6.0
    • None
    • None
    • AWS EMR emr-5.32.0 , spark version 2.4.7

    Description

      Exception:

       

      java.lang.NullPointerException  
      at org.apache.hudi.common.util.ParquetUtils.fetchRecordKeyPartitionPathFromParquet(ParquetUtils.java:146)  
      at org.apache.hudi.io.HoodieKeyLocationFetchHandle.locations(HoodieKeyLocationFetchHandle.java:53)  
      at org.apache.hudi.index.simple.HoodieSimpleIndex.lambda$fetchRecordLocations$c57f549b$1(HoodieSimpleIndex.java:179)  
      at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:143)  
      at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$3$1.apply(JavaRDDLike.scala:143)  
      at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)  
      at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)  
      at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:188)  
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)  
      at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)  
      at org.apache.spark.scheduler.Task.run(Task.scala:123)  
      at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)  
      at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)  
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)  
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)  
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)  
      at java.lang.Thread.run(Thread.java:748)
      

       

       

      We are using hudi as our storage engine for the output of our Spark jobs. We use AWS EMR to run the jobs. Recently we started observing that some of the upsert commits are leaving the table in an inconsistent state i.e. _hoodie_record_key is observed to be null for a record which is updated during that commit.

       

      How are we checking that _hoodie_record_key is null?

      val df = spark.read
                  .format("org.apache.hudi")
                  .load("s3://myLocation/my-table" + "/*/*/*/*")
                  
      df.filter($"_hoodie_record_key".isNull).show(false)
      
      // Output
      +------------------+----------------------+--------------------------+
      |_hoodie_record_key|_hoodie_partition_path|primaryKey                |
      +------------------+----------------------+--------------------------+
      |null              |2021/07/01            |xxxxxxxxxxxxxxxxxxxxxx    |
      +------------------+----------------------+--------------------------+
      

       

       

      One thing to note here is that the record which has null for _hoodie_record_key was already present in the hudi table and was updated during the commit

      What is even weird for us is that there is only a single record in the hudi table with _hoodie_record_key as null, and all other records are fine

      We have verified that the column that is used as _hoodie_record_key (RECORDKEY_FIELD_OPT_KEY) is present in the record and is NOT NULL

      After rolling back the faulty commit which introduced that record, rerunning the job works fine .i.e there are no records with _hoodie_record_key null

      HoodieWriter Config
      __

      val hudiOptions = Map[String, String](  
             RECORDKEY_FIELD_OPT_KEY -> "primaryKey",  
           PARTITIONPATH_FIELD_OPT_KEY -> "partitionKey",  
           PRECOMBINE_FIELD_OPT_KEY -> "updateTime",  
           KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getName,  
           CLEANER_COMMITS_RETAINED_PROP -> "5"
      )dataframe.write.format("org.apache.hudi")  
        .option(HoodieWriteConfig.TABLE_NAME, "myTable")  
        .options(hudiOptions)  
        .option(HoodieIndexConfig.INDEX_TYPE_PROP,"SIMPLE")  
        .mode(SaveMode.Append)  
        .save("s3://mylocation/")
      

       

      We are using a custom RecordPayload class which inherits from OverwriteWithLatestAvroPayload

       

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            varunb94 Varun
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: