Details
-
Improvement
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
None
-
None
-
1
Description
GetAllPartitionPaths had some perf degradation from 0.9.0 to 0.10.0 and hence we had reverted the change for now. But the change as such was good. So, we want to follow up to see if we can fix/enhance the new code. Old code does not leverage the spark engine to parallelize across diff folders. So, there could be scope for improvement. but from the perf nos, its not straight forward. So creating a follow up ticket.
excerpt from the findings.
For one of my test tables in S3, with EMR cluster (10k partitions)
- With 0.11.0:
147 secs. - With this patch as is (where engine context is not used for 2nd phase)
5.7 secs. - Latest master + adding engineContext for 2nd phase:
16 secs. - I also tried completely rewriting the dag.
12 secs.
while (!pathsToList.isEmpty()) {
// TODO: Get the parallelism from HoodieWriteConfig
int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());
// List all directories in parallel
List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList, path ->
, listingParallelism);
pathsToList.clear();
// if current dictionary contains PartitionMetadata, add it to result
// if current dictionary does not contain PartitionMetadata, add it to queue
int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, dirToFileListing.size());
List<Pair<Option<String>, Option<Path>>> result = engineContext.map(dirToFileListing, fileStatus -> {
FileSystem fileSystem = fileStatus.getPath().getFileSystem(hadoopConf.get());
if (fileStatus.isDirectory()) {
if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, fileStatus.getPath()))
else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
}
} else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
return Pair.of(Option.of(partitionName), Option.empty());
}
return Pair.of(Option.empty(), Option.empty());
}, fileListingParallelism);
partitionPaths.addAll(result.stream().filter(entry -> entry.getKey().isPresent()).map(entry -> entry.getKey().get())
.collect(Collectors.toList()));
pathsToList.addAll(result.stream().filter(entry -> entry.getValue().isPresent()).map(entry -> entry.getValue().get())
.collect(Collectors.toList()));
}
So, based on above findings, I will go w/ what we have in this patch in its current state. Will address Raymond's and Alexey's feedback alone and unblock 0.11.1.
Ref patch: https://github.com/apache/hudi/pull/5829
Attachments
Issue Links
- links to