Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-2505 [UMBRELLA] Spark DataSource APIs and Spark SQL discrepancies
  3. HUDI-2500

Spark datasource delete not working on Spark SQL created table

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Critical
    • Resolution: Won't Fix
    • None
    • 0.10.0
    • spark

    Description

      Original issue https://github.com/apache/hudi/issues/3670

       

      Script to re-produce

      val sparkSourceTablePath = s"${tmp.getCanonicalPath}/test_spark_table"
      val sparkSourceTableName = "test_spark_table"
      val hudiTablePath = s"${tmp.getCanonicalPath}/test_hudi_table"
      val hudiTableName = "test_hudi_table"
      println("0 - prepare source data")
      spark.createDataFrame(Seq(
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
        ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
      )).toDF("id", "creation_date", "last_update_time")
        .withColumn("creation_date", expr("cast(creation_date as date)"))
        .withColumn("id", expr("cast(id as bigint)"))
        .write
        .option("path", sparkSourceTablePath)
        .mode("overwrite")
        .format("parquet")
        .saveAsTable(sparkSourceTableName)
      
      println("1 - CTAS to load data to Hudi")
      val hudiOptions = Map[String, String](
        HoodieWriteConfig.TBL_NAME.key() -> hudiTableName,
        DataSourceWriteOptions.TABLE_NAME.key() -> hudiTableName,
        DataSourceWriteOptions.TABLE_TYPE.key() -> "COPY_ON_WRITE",
        DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "id",
        DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[ComplexKeyGenerator].getCanonicalName,
        DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getCanonicalName,
        DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "creation_date",
        DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "last_update_time",
        HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key() -> "1",
        HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key() -> "1",
        HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key() -> "1",
        HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key() -> "1",
        HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key() -> "1"
      )
      spark.sql(
        s"""create table if not exists $hudiTableName using hudi
           |          location '$hudiTablePath'
           |          options (
           |          type = 'cow',
           |          primaryKey = 'id',
           |          preCombineField = 'last_update_time'
           |          )
           |          partitioned by (creation_date)
           |          AS
           |          select id, last_update_time, creation_date from $sparkSourceTableName
           |          """.stripMargin)
      println("2 - Hudi table has all records")
      spark.sql(s"select * from $hudiTableName").show(100)
      println("3 - pick 105 to delete")
      val rec105 = spark.sql(s"select * from $hudiTableName where id = 105")
      rec105.show()
      println("4 - issue delete (Spark SQL)")
      spark.sql(s"delete from $hudiTableName where id = 105")
      println("5 - 105 is deleted")
      spark.sql(s"select * from $hudiTableName").show(100)
      println("6 - pick 104 to delete")
      val rec104 = spark.sql(s"select * from $hudiTableName where id = 104")
      rec104.show()
      println("7 - issue delete (DataSource)")
      rec104.write
        .format("hudi")
        .options(hudiOptions)
        .option(DataSourceWriteOptions.OPERATION.key(), "delete")
        .option(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), classOf[EmptyHoodieRecordPayload].getCanonicalName)
        .mode(SaveMode.Append)
        .save(hudiTablePath)
      println("8 - 104 should be deleted")
      spark.sql(s"select * from $hudiTableName").show(100)
      

      Attachments

        Issue Links

          Activity

            People

              biyan900116@gmail.com Yann Byron
              xushiyan Shiyan Xu
              Shiyan Xu
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: