Uploaded image for project: 'Apache Arrow'
  1. Apache Arrow
  2. ARROW-11781

[Python] Reading small amount of files from a partitioned dataset is unexpectedly slow

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • None
    • 4.0.0
    • Python
    • None

    Description

      I posted this on StackOverflow and was told I should probably create an issue here.

      I managed to create a relative minimal example:

          df = spark.createDataFrame(
              [
                  (str(a), b, c, random.randint(0, 1000))
                  for a in range(100)
                  for b in range(10)
                  for c in range(10000)
              ],
              ['a', 'b', 'c', 'd']
          )    
      
      print("Writing the spark dataframe to the file system in partitioned folders.")
          df.repartition('a').write.partitionBy('a', 'b').parquet(str(data_dir), compression='snappy', mode='overwrite')    
      
      def time_it(func, repetition=10):
              start = time.time()
              for _ in range(repetition):
                  func()
              return (time.time() - start) / repetition    
      
      print("Loading the entire dataset")
      print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow')))    
      
      print("Loading a single file using filters")
      print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', filters=[[('a', '=', '0'), ('b', '=', '0')]])))    
      
      print("Loading a single file using filters and a specified partitioning")
          partitioning = pa.dataset.HivePartitioning(
              pa.schema([
                  pa.field('a', pa.string()),
                  pa.field('b', pa.string())
              ])
          )
      print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', filters=[[('a', '=', '0'), ('b', '=', '0')]], partitioning=partitioning)))    
      
      print("Loading a single file by specifying the path")
      print(time_it(lambda: pd.read_parquet(data_dir / 'a=0' / 'b=0', engine='pyarrow')))
      

      Which gives me the following output:

      Writing the spark dataframe to the file system in partitioned folders.
      Loading the entire dataset
      0.23926825523376466
      Loading a single file using filters
      0.04788286685943603
      Loading a single file using filters and a specified partitioning
      0.0323061466217041
      Loading a single file by specifying the path
      0.0017130613327026368
      

       

      Loading the small amount of files is about 20 times faster if you address the paths directly, compared to the pyarrow filters.

       

      The question as I posted it on StackOverflow:

      I am having some problems with the speed of loading `.parquet` files. However, I don't know what I am doing wrong.

      Problem

      I am trying to read a single `.parquet` file from from my local filesystem which is the partitioned output from a spark job. Such that there are `.parquet` files in hierarchical directories named `a=x` and `b=y`.

      To achieve this, I am using `pandas.read_parquet` (which uses `pyarrow.parquet.read_table`) for which I include the `filters` kwarg. The run time of using the `filters` is way longer than I would expect.

      # The following runs for about 55 seconds
      pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', '=', 'y')]])
      # The following runs for about 0.04 seconds
      pd.read_parquet(<path_to_entire_dataset>/a=x/b=y/)
      # The following runs for about 70 seconds
      pd.read_parquet(<path_to_entire_dataset>)

      Reading a single parquet file by specifying filters is only slightly faster than loading the entire dataset, where I would expect a run time approximately linear in the amount of files.

      What mistake do I make here?

      I realize that simply putting the filters in the path would work, however this will quickly become complex as what I want to filter on will / can change. Besides, I think `read_table` should be able to load this data efficiently.

      PS: The entire dataset contains many millions of rows, the data I want to load is only a few thousand rows.

      Edit 1:
      As suggested by 0x26res I manually defined the partitioning, this lead to a significant speed up, but still not as much as I would have expected. In this situation the run time was about 5 seconds.

      partitioning = HivePartitioning(
       pa.schema([
       pa.field('a', pa.string()),
       pa.field('b', pa.int32()),
       ])
      )
      pd.read_parquet(
       <path_to_entire_dataset>,
       engine='pyarrow',
       filters=[
       [
       ('a', '=', x),
       ('b', '=', y),
       ]
       ],
       partitioning=partitioning
      )
      

      Attachments

        1. spy.svg
          125 kB
          David Li
        2. spy3.svg
          117 kB
          David Li
        3. spy2.svg
          220 kB
          David Li

        Issue Links

          Activity

            People

              Unassigned Unassigned
              JeroenBos Jeroen
              Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: