Uploaded image for project: 'SystemDS'
  1. SystemDS
  2. SYSTEMDS-993

Performance: Improve Vector DataFrame Conversions

Details

    • Improvement
    • Status: Closed
    • Critical
    • Resolution: Fixed
    • None
    • SystemML 0.11
    • None
    • None

    Description

      Currently, the performance of vector DataFrame conversions leaves much to be desired, with regards to frequent OOM errors, and overall slow performance.

      Scenario:

      • Spark DataFrame:
        • 3,745,888 rows
        • Each row contains one vector column, where the vector is dense and of length 65,539. Note: This is a 256x256 pixel image stretched out and appended with a 3-column one-hot encoded label. I.e. this is a forced workaround to get both the labels and features into SystemML as efficiently as possible.
      • SystemML script + MLContext invocation (Python): Simply accept the DataFrame as input and save as a SystemML matrix in binary form. Note: I'm not grabbing the output here, so the matrix will literally be written by SystemML.
        • script = """
          write(train, "train", format="binary")
          """
          script = dml(script).input(train=train_YX)
          ml.execute(script)
          

      I'm seeing large amounts of memory being used in conversion during the mapPartitionsToPair at RDDConverterUtils.java:311 stage. For example, I have a scenario where it read in 1493.2 GB as "Input", and performed a "Shuffle Write" of 2.5 TB. A subsequent stage of saveAsHadoopFile at WriteSPInstruction.java:261 then did a "Shuffle Read" of 2.5TB, and "Output" 1829.1 GB. This was for a simple script that took in DataFrames with a vector column and wrote to disk in binary format. It kept running out of heap space memory, so I kept increasing the executor memory 3x until it finally ran. Additionally, the latter stage had a very skewed execution time across the partitions, with ~1hour for the first 1000 paritions (out of 20,000), ~20 minutes for the next 18,000 partitions, and ~1 hour for the final 1000 partitions. The passed in DataFrame had an average of 180 rows per partition with a max of 215, and a min of 155.

      cc mboehm7

      Attachments

        Activity

          dusenberrymw Mike Dusenberry added a comment - cc acs_s , nakul02
          mboehm7 Matthias Boehm added a comment -

          ok thanks for the details mwdusenb@us.ibm.com - here is the list of things we need to address:
          (1) new IPA pass to remove the checkpoint after the persistent read if directly followed by a persistent write (in your case this unnecessarily occupies 75% of executor memory)
          (2) investigation of our custom hash function for matrix indexes w.r.t. the skew issue you've observed (I want to change this function for a while now, new data structures such as our native LongLongDoubleHashMap already use a better hash function for long-long-pairs)
          (3) Less allocations on row/vector to sparse row appends (sparse rows with estimated nnz use a grow factor of 2x which still requires log N allocation steps, we could avoid that by counting the row nnz and allocating it once, which will likely reduce the pressure on GC).
          (4) Compressed sparse blocks (the reason why you see a size increase is that we have to represent partial blocks in sparse and apparently the default snappy compression during shuffle does not lead to a good compression ratio; we should introduce a custom serialization format for small sparse blocks, where we can encode the nnz per row and column indexes as short instead of int).

          Until our next 0.11 RC, i.e., tomorrow, I'll resolve (1) and (3) - the other issues will be addressed until our 1.0 release.

          mboehm7 Matthias Boehm added a comment - ok thanks for the details mwdusenb@us.ibm.com - here is the list of things we need to address: (1) new IPA pass to remove the checkpoint after the persistent read if directly followed by a persistent write (in your case this unnecessarily occupies 75% of executor memory) (2) investigation of our custom hash function for matrix indexes w.r.t. the skew issue you've observed (I want to change this function for a while now, new data structures such as our native LongLongDoubleHashMap already use a better hash function for long-long-pairs) (3) Less allocations on row/vector to sparse row appends (sparse rows with estimated nnz use a grow factor of 2x which still requires log N allocation steps, we could avoid that by counting the row nnz and allocating it once, which will likely reduce the pressure on GC). (4) Compressed sparse blocks (the reason why you see a size increase is that we have to represent partial blocks in sparse and apparently the default snappy compression during shuffle does not lead to a good compression ratio; we should introduce a custom serialization format for small sparse blocks, where we can encode the nnz per row and column indexes as short instead of int). Until our next 0.11 RC, i.e., tomorrow, I'll resolve (1) and (3) - the other issues will be addressed until our 1.0 release.
          mboehm7 Matthias Boehm added a comment -

          ok 3, has been resolved - (1) will follow tomorrow.

          mboehm7 Matthias Boehm added a comment - ok 3, has been resolved - (1) will follow tomorrow.

          Great, I'm glad that this provided enough information to create a list of items to address!

          dusenberrymw Mike Dusenberry added a comment - Great, I'm glad that this provided enough information to create a list of items to address!
          mboehm7 Matthias Boehm added a comment -

          just a quick update with regard to the skew issue - based on a couple of experiments, I can confirm that the hash function is causing this problem.

          Here are the results comparing the distribution of blocks to partitions for two different scenarios (10M x 1K, blocksize 1K, dense and 3.3M x 65K, blocksize 1K, dense). The hash functions compared are H1: MatrixIndexes.hashCode() and H2 with the following code, which is similar to Arrays.hashCode(long[])

          //basic hash mixing of two longs hashes (w/o object creation)
          int h = (int)(key1 ^ (key1 >>> 32));
          return h*31 + (int)(key2 ^ (key2 >>> 32));
          

          Scenario 1: 10M x 1K, blocksize 1K, dense
          H1: min 16 blocks, max 18 blocks, 596/596 partitions with >=1 block
          H2: min 16 blocks, max 17 blocks, 596/596 partitions with >= 1 block

          Scenario 2: 3.3M x 65K, blocksize 1K, dense
          H1: min 10 blocks, max 66 blocks, 3456/13182 partitions with >= 1 block
          H2: min 14 blocks, max 18 blocks, 13182/13182 partitions with >= 1 block

          While on Scenario 1 (tall and skinny matrix), both functions perform well, our existing hash function H1 performs very badly for wide matrices. mwdusenb@us.ibm.com This explains the huge difference in execution time as 2/3 of all partitions were completely empty.

          mboehm7 Matthias Boehm added a comment - just a quick update with regard to the skew issue - based on a couple of experiments, I can confirm that the hash function is causing this problem. Here are the results comparing the distribution of blocks to partitions for two different scenarios (10M x 1K, blocksize 1K, dense and 3.3M x 65K, blocksize 1K, dense). The hash functions compared are H1: MatrixIndexes.hashCode() and H2 with the following code, which is similar to Arrays.hashCode(long[]) //basic hash mixing of two longs hashes (w/o object creation) int h = ( int )(key1 ^ (key1 >>> 32)); return h*31 + ( int )(key2 ^ (key2 >>> 32)); Scenario 1: 10M x 1K, blocksize 1K, dense H1: min 16 blocks, max 18 blocks, 596/596 partitions with >=1 block H2: min 16 blocks, max 17 blocks, 596/596 partitions with >= 1 block Scenario 2: 3.3M x 65K, blocksize 1K, dense H1: min 10 blocks, max 66 blocks, 3456/13182 partitions with >= 1 block H2: min 14 blocks, max 18 blocks, 13182/13182 partitions with >= 1 block While on Scenario 1 (tall and skinny matrix), both functions perform well, our existing hash function H1 performs very badly for wide matrices. mwdusenb@us.ibm.com This explains the huge difference in execution time as 2/3 of all partitions were completely empty.
          mboehm7 Matthias Boehm added a comment -

          I also ran some experiments with a new serialized block type "SPARSE_SMALL", which encodes the nnz per row and column indexes as shorts instead of integers. On a scenario, of 100K x 64K w/ default snappy compression of shuffled data, this achieved only a shuffle size reduction of 55.9GB to 55.5GB. This small improvement does not justify the added complexity.

          mboehm7 Matthias Boehm added a comment - I also ran some experiments with a new serialized block type "SPARSE_SMALL", which encodes the nnz per row and column indexes as shorts instead of integers. On a scenario, of 100K x 64K w/ default snappy compression of shuffled data, this achieved only a shuffle size reduction of 55.9GB to 55.5GB. This small improvement does not justify the added complexity.

          Ah that experiment with the hash functions is quite interesting and certainly explains the behavior I saw. As for the experiment with a more compressible block type, I agree that is not beneficial enough. I think items 1-3, and particularly the updated hash function, should help a bunch here, and (4) should still remain as an area of improvement.

          dusenberrymw Mike Dusenberry added a comment - Ah that experiment with the hash functions is quite interesting and certainly explains the behavior I saw. As for the experiment with a more compressible block type, I agree that is not beneficial enough. I think items 1-3, and particularly the updated hash function, should help a bunch here, and (4) should still remain as an area of improvement.
          mboehm7 Matthias Boehm added a comment -

          absolutely - patches (1) - (3) are ready and go in tomorrow along with a major rework for SYSTEMML-994. Again, I really appreciate your time describing these issues.

          mboehm7 Matthias Boehm added a comment - absolutely - patches (1) - (3) are ready and go in tomorrow along with a major rework for SYSTEMML-994 . Again, I really appreciate your time describing these issues.

          Excellent! Eagerly looking forward to making use of them, and thanks for the continued support here – definitely helping for push forward the project I'm sprinting on.

          dusenberrymw Mike Dusenberry added a comment - Excellent! Eagerly looking forward to making use of them, and thanks for the continued support here – definitely helping for push forward the project I'm sprinting on.

          People

            mboehm7 Matthias Boehm
            dusenberrymw Mike Dusenberry
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: