Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
0.5.0
-
None
Description
We are evaluating Hudi to use for our near real-time ingestion needs, compared to other solutions (Delta/Iceberg). We've picked Hudi because pre-installed with Amazon EMR by AWS. However, adopting it is blocking on this issue with concurrent small batch (of 256 files) write jobs (to the same S3 path).
Using Livy we're triggering Spark jobs writing Hudi tables over S3, on EMR with EMRFS active. Paths are using the "s3://" prefix and EMRFS is active. We're writing Spark SQL datasets promoted up from RDDs. The "hoodie.consistency.check.enabled" is set to true. Spark serializer is Kryo. Hoodie version is 0.5.0-incubating.
Both on COW and MOR tables some of the submitted jobs are failing with the below exception:
org.apache.hudi.exception.HoodieIOException: Could not delete in-flight instant [==>20200326175252__deltacommit__INFLIGHT] at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:239) at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInflight(HoodieActiveTimeline.java:222) at org.apache.hudi.table.HoodieCopyOnWriteTable.deleteInflightInstant(HoodieCopyOnWriteTable.java:380) at org.apache.hudi.table.HoodieMergeOnReadTable.rollback(HoodieMergeOnReadTable.java:327) at org.apache.hudi.HoodieWriteClient.doRollbackAndGetStats(HoodieWriteClient.java:834) at org.apache.hudi.HoodieWriteClient.rollbackInternal(HoodieWriteClient.java:907) at org.apache.hudi.HoodieWriteClient.rollback(HoodieWriteClient.java:733) at org.apache.hudi.HoodieWriteClient.rollbackInflightCommits(HoodieWriteClient.java:1121) at org.apache.hudi.HoodieWriteClient.startCommitWithTime(HoodieWriteClient.java:994) at org.apache.hudi.HoodieWriteClient.startCommit(HoodieWriteClient.java:987) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:141) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
The jobs are sent in concurrent batches of 256 files, over the same S3 path, in total some 8k files for 6 hours of our data.
Writing happens with the following code (basePath is an S3 bucket):
// Configs (edited) String databaseName = "nrt"; String assumeYmdPartitions = "false"; String extractorClass = MultiPartKeysValueExtractor.class.getName (); String tableType = DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL (); String tableOperation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL (); String hiveJdbcUri = "jdbc:hive2://ip-x-y-z-q.eu-west-1.compute.internal:10000"; String basePath = "s3://some_path_to_hudi"; // or "s3a://" does not seem to matter, same exception String avroSchemaAsString = avroSchema.toString (); String tableName = avroSchema.getName ().toLowerCase ().replace ("avro", ""); eventsDataset.write () .format ("org.apache.hudi") .option (HoodieWriteConfig.TABLE_NAME, tableName) .option (DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY (), tableType) .option (DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY (), "id") .option (DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY (), "partition_path") .option (DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY (), "timestamp") .option (DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY (), "true") .option (DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY (), databaseName) .option (DataSourceWriteOptions.HIVE_TABLE_OPT_KEY (), tableName) .option (DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY (), "tenant,year,month,day") .option (DataSourceWriteOptions.HIVE_URL_OPT_KEY (), hiveJdbcUri) .option (DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY (), assumeYmdPartitions) .option (DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY (), extractorClass) .option (DataSourceWriteOptions.OPERATION_OPT_KEY (), tableOperation) .mode (SaveMode.Append) .save (String.format ("%s/%s", basePath, tableName));