Details
-
Sub-task
-
Status: Closed
-
Critical
-
Resolution: Won't Fix
-
None
Description
Original issue https://github.com/apache/hudi/issues/3670
Script to re-produce
val sparkSourceTablePath = s"${tmp.getCanonicalPath}/test_spark_table" val sparkSourceTableName = "test_spark_table" val hudiTablePath = s"${tmp.getCanonicalPath}/test_hudi_table" val hudiTableName = "test_hudi_table" println("0 - prepare source data") spark.createDataFrame(Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") )).toDF("id", "creation_date", "last_update_time") .withColumn("creation_date", expr("cast(creation_date as date)")) .withColumn("id", expr("cast(id as bigint)")) .write .option("path", sparkSourceTablePath) .mode("overwrite") .format("parquet") .saveAsTable(sparkSourceTableName) println("1 - CTAS to load data to Hudi") val hudiOptions = Map[String, String]( HoodieWriteConfig.TBL_NAME.key() -> hudiTableName, DataSourceWriteOptions.TABLE_NAME.key() -> hudiTableName, DataSourceWriteOptions.TABLE_TYPE.key() -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "id", DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[ComplexKeyGenerator].getCanonicalName, DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getCanonicalName, DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "last_update_time", HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key() -> "1", HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key() -> "1", HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key() -> "1", HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key() -> "1", HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key() -> "1" ) spark.sql( s"""create table if not exists $hudiTableName using hudi | location '$hudiTablePath' | options ( | type = 'cow', | primaryKey = 'id', | preCombineField = 'last_update_time' | ) | partitioned by (creation_date) | AS | select id, last_update_time, creation_date from $sparkSourceTableName | """.stripMargin) println("2 - Hudi table has all records") spark.sql(s"select * from $hudiTableName").show(100) println("3 - pick 105 to delete") val rec105 = spark.sql(s"select * from $hudiTableName where id = 105") rec105.show() println("4 - issue delete (Spark SQL)") spark.sql(s"delete from $hudiTableName where id = 105") println("5 - 105 is deleted") spark.sql(s"select * from $hudiTableName").show(100) println("6 - pick 104 to delete") val rec104 = spark.sql(s"select * from $hudiTableName where id = 104") rec104.show() println("7 - issue delete (DataSource)") rec104.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION.key(), "delete") .option(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), classOf[EmptyHoodieRecordPayload].getCanonicalName) .mode(SaveMode.Append) .save(hudiTablePath) println("8 - 104 should be deleted") spark.sql(s"select * from $hudiTableName").show(100)
Attachments
Issue Links
- relates to
-
HUDI-2390 KeyGenerator discrepancy between DataFrame writer and SQL
- Closed