Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-4242

Follow up on getAllPartitionPaths perf enhancement

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • None
    • 0.12.0
    • performance, reader-core
    • None
    • 1

    Description

      GetAllPartitionPaths had some perf degradation from 0.9.0 to 0.10.0 and hence we had reverted the change for now. But the change as such was good. So, we want to follow up to see if we can fix/enhance the new code. Old code does not leverage the spark engine to parallelize across diff folders. So, there could be scope for improvement. but from the perf nos, its not straight forward. So creating a follow up ticket.

       

      excerpt from the findings. 

      For one of my test tables in S3, with EMR cluster (10k partitions) 

      1. With 0.11.0:
        147 secs.
      2. With this patch as is (where engine context is not used for 2nd phase)
        5.7 secs.
      3. Latest master + adding engineContext for 2nd phase:
        16 secs.
      4. I also tried completely rewriting the dag.
        12 secs.

      while (!pathsToList.isEmpty()) {
      // TODO: Get the parallelism from HoodieWriteConfig
      int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());

      // List all directories in parallel
      List<FileStatus> dirToFileListing = engineContext.flatMap(pathsToList, path ->

      { FileSystem fileSystem = path.getFileSystem(hadoopConf.get()); return Arrays.stream(fileSystem.listStatus(path)); }

      , listingParallelism);
      pathsToList.clear();

      // if current dictionary contains PartitionMetadata, add it to result
      // if current dictionary does not contain PartitionMetadata, add it to queue
      int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, dirToFileListing.size());
      List<Pair<Option<String>, Option<Path>>> result = engineContext.map(dirToFileListing, fileStatus -> {
      FileSystem fileSystem = fileStatus.getPath().getFileSystem(hadoopConf.get());
      if (fileStatus.isDirectory()) {
      if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, fileStatus.getPath()))

      { return Pair.of(Option.of(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath())), Option.empty()); }

      else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
      return Pair.of(Option.empty(), Option.of(fileStatus.getPath()));
      }
      } else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
      String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
      return Pair.of(Option.of(partitionName), Option.empty());
      }
      return Pair.of(Option.empty(), Option.empty());
      }, fileListingParallelism);

      partitionPaths.addAll(result.stream().filter(entry -> entry.getKey().isPresent()).map(entry -> entry.getKey().get())
      .collect(Collectors.toList()));

      pathsToList.addAll(result.stream().filter(entry -> entry.getValue().isPresent()).map(entry -> entry.getValue().get())
      .collect(Collectors.toList()));
      }
      So, based on above findings, I will go w/ what we have in this patch in its current state. Will address Raymond's and Alexey's feedback alone and unblock 0.11.1.

       

      Ref patch: https://github.com/apache/hudi/pull/5829

      Attachments

        Issue Links

          Activity

            People

              shivnarayan sivabalan narayanan
              shivnarayan sivabalan narayanan
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: