Uploaded image for project: 'SystemDS'
  1. SystemDS
  2. SYSTEMDS-994

GC OOM: Binary Matrix to Frame Conversion

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • None
    • SystemML 0.11
    • None
    • None

    Description

      I currently have a SystemML matrix saved to HDFS in binary block format, and am attempting to read it in, convert it to a frame, and then pass that to an algorithm so that I can pull batches out of it with minimal overhead.

      When attempting to run this, I am repeatedly hitting the following GC limit:

      java.lang.OutOfMemoryError: GC overhead limit exceeded
      	at org.apache.sysml.runtime.matrix.data.FrameBlock.ensureAllocatedColumns(FrameBlock.java:281)
      	at org.apache.sysml.runtime.matrix.data.FrameBlock.copy(FrameBlock.java:979)
      	at org.apache.sysml.runtime.matrix.data.FrameBlock.copy(FrameBlock.java:965)
      	at org.apache.sysml.runtime.matrix.data.FrameBlock.<init>(FrameBlock.java:91)
      	at org.apache.sysml.runtime.instructions.spark.utils.FrameRDDAggregateUtils$CreateBlockCombinerFunction.call(FrameRDDAggregateUtils.java:57)
      	at org.apache.sysml.runtime.instructions.spark.utils.FrameRDDAggregateUtils$CreateBlockCombinerFunction.call(FrameRDDAggregateUtils.java:48)
      	at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
      	at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:187)
      	at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:186)
      	at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:148)
      	at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
      	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
      	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
      	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
      	at org.apache.spark.scheduler.Task.run(Task.scala:89)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      

      Script:

      train = read("train")
      val = read("val")
      
      trainf = as.frame(train)
      valf = as.frame(val)
      
      // Rest of algorithm, which passes the frames to DML functions, and performs row indexing to pull out batches, convert to matrices, and train.
      

      Cluster setup:

      • Spark Standalone
      • 1 Master, 9 Workers
      • 47 cores, 124 GB available to Spark on each Worker (1 core + 1GB saved for OS)
      • spark.driver.memory 80g
      • spark.executor.memory 21g
      • spark.executor.cores 3
      • spark.default.parallelism 20000
      • spark.driver.maxResultSize 0
      • spark.akka.frameSize 128
      • spark.network.timeout 1000s

      Note: This is using today's latest build as of 09.29.16 1:30PM PST.

      cc mboehm7, acs_s

      Attachments

        Issue Links

          Activity

            mboehm7 Matthias Boehm added a comment -

            after a closer look, it turned out that the matrix-to-frame converter had two fundamental problems (in addition to the minor points mentioned above) - the solutions are as follows:

            (3) Shuffle of matrix block instead of frame blocks: The current approach was to create for each matrix block an extended frame block, followed by a mergeByKey (including shuffle). However, as each matrix block of column blocksize 1K is extended to a frameblock in the number of columns (64K in this case) and because frameblocks are always represented in dense, this created temporary memory requirements 64x (+overhead) larger than the inputs. The solution is simple, instead of shuffling frame blocks, we should create and shuffle extended matrix blocks (which are represented in sparse) and only convert full matrix blocks to full frame blocks.

            (4) Frame block conversion via column set appends: For the specific case of dense matrix blocks to frame blocks, we use a specific cache-conscious row-major to column-major conversion with column appends. However, the recent change of array-based frame block schema/meta data in SYSTEMML-557, requires array reallocations per column append, which really hurts with large number of columns (due to O(n^2) cell allocations/copies). We should append the entire prepared column set once, which avoids any reallocations altogether.

            mboehm7 Matthias Boehm added a comment - after a closer look, it turned out that the matrix-to-frame converter had two fundamental problems (in addition to the minor points mentioned above) - the solutions are as follows: (3) Shuffle of matrix block instead of frame blocks: The current approach was to create for each matrix block an extended frame block, followed by a mergeByKey (including shuffle). However, as each matrix block of column blocksize 1K is extended to a frameblock in the number of columns (64K in this case) and because frameblocks are always represented in dense, this created temporary memory requirements 64x (+overhead) larger than the inputs. The solution is simple, instead of shuffling frame blocks, we should create and shuffle extended matrix blocks (which are represented in sparse) and only convert full matrix blocks to full frame blocks. (4) Frame block conversion via column set appends: For the specific case of dense matrix blocks to frame blocks, we use a specific cache-conscious row-major to column-major conversion with column appends. However, the recent change of array-based frame block schema/meta data in SYSTEMML-557 , requires array reallocations per column append, which really hurts with large number of columns (due to O(n^2) cell allocations/copies). We should append the entire prepared column set once, which avoids any reallocations altogether.
            mboehm7 Matthias Boehm added a comment -

            thanks mwdusenb@us.ibm.com - regarding the GC issues, it is likely caused by the fact that we try to compose frame blocks of 1M cells, which means that for 1M/64k we have only 15 rows per block - due to column oriented storage, the meta data is likely to dominate the memory consumption, especially for double schemas. Here are the actions items for improving the memory efficiency:

            (1) Memory-efficient default meta data: For homogeneous schemas and default meta data (column names, transform meta data) we should use compressed representations and only create the default meta data on demand.
            (2) Larger frame blocks: For extremely wide tables we should use a larger number of cells (and thus larger number of rows) to amortize the overhead per column (e.g., 32B overhead per array and additional objects per column).

            mboehm7 Matthias Boehm added a comment - thanks mwdusenb@us.ibm.com - regarding the GC issues, it is likely caused by the fact that we try to compose frame blocks of 1M cells, which means that for 1M/64k we have only 15 rows per block - due to column oriented storage, the meta data is likely to dominate the memory consumption, especially for double schemas. Here are the actions items for improving the memory efficiency: (1) Memory-efficient default meta data: For homogeneous schemas and default meta data (column names, transform meta data) we should use compressed representations and only create the default meta data on demand. (2) Larger frame blocks: For extremely wide tables we should use a larger number of cells (and thus larger number of rows) to amortize the overhead per column (e.g., 32B overhead per array and additional objects per column).

            Also, latest run failed with heap space OOM error:

            java.lang.OutOfMemoryError: Java heap space
            	at org.apache.sysml.runtime.matrix.data.MatrixBlock.allocateDenseBlock(MatrixBlock.java:368)
            	at org.apache.sysml.runtime.matrix.data.MatrixBlock.allocateDenseBlock(MatrixBlock.java:388)
            	at org.apache.sysml.runtime.matrix.data.MatrixBlock.copyDenseToDense(MatrixBlock.java:1361)
            	at org.apache.sysml.runtime.matrix.data.MatrixBlock.copy(MatrixBlock.java:1324)
            	at org.apache.sysml.runtime.matrix.data.MatrixBlock.copy(MatrixBlock.java:1302)
            	at org.apache.sysml.runtime.matrix.data.MatrixBlock.<init>(MatrixBlock.java:153)
            	at org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction.call(CopyBlockPairFunction.java:62)
            	at org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction.call(CopyBlockPairFunction.java:36)
            	at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1018)
            	at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1018)
            	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
            	at scala.collection.Iterator$$anon$13.next(Iterator.scala:372)
            	at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
            	at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
            	at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1251)
            	at org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply$mcV$sp(DiskStore.scala:81)
            	at org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81)
            	at org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81)
            	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239)
            	at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:82)
            	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:808)
            	at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:655)
            	at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153)
            	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
            	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
            	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
            	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
            	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
            	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
            	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
            	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
            
            dusenberrymw Mike Dusenberry added a comment - Also, latest run failed with heap space OOM error: java.lang.OutOfMemoryError: Java heap space at org.apache.sysml.runtime.matrix.data.MatrixBlock.allocateDenseBlock(MatrixBlock.java:368) at org.apache.sysml.runtime.matrix.data.MatrixBlock.allocateDenseBlock(MatrixBlock.java:388) at org.apache.sysml.runtime.matrix.data.MatrixBlock.copyDenseToDense(MatrixBlock.java:1361) at org.apache.sysml.runtime.matrix.data.MatrixBlock.copy(MatrixBlock.java:1324) at org.apache.sysml.runtime.matrix.data.MatrixBlock.copy(MatrixBlock.java:1302) at org.apache.sysml.runtime.matrix.data.MatrixBlock.<init>(MatrixBlock.java:153) at org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction.call(CopyBlockPairFunction.java:62) at org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction.call(CopyBlockPairFunction.java:36) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1018) at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1018) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) at scala.collection.Iterator$$anon$12.next(Iterator.scala:357) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1251) at org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply$mcV$sp(DiskStore.scala:81) at org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81) at org.apache.spark.storage.DiskStore$$anonfun$putIterator$1.apply(DiskStore.scala:81) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1239) at org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:82) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:808) at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:655) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:153) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

            Thanks, mboehm7. I've updated the config settings, and am running another set of jobs. I do have a question regarding (5). I'm not running any executors on the driver node, but that driver node is acting as the HDFS Namenode, is additionally hosting the Spark history server (I meant to add that I have spark.eventLog.enabled set to true), and has an Ambari metrics monitor as well. Are any of those enough to cause scheduling delays?

            Here's the full, updated spark-defaults.sh file:

            spark.driver.memory 80g
            spark.executor.memory 118g
            spark.driver.extraJavaOptions -server
            spark.executor.extraJavaOptions -server
            spark.driver.maxResultSize       0
            spark.eventLog.enabled       true
            spark.eventLog.dir       hdfs://spark-ml-node-1.fyre.ibm.com:8020/iop/apps/4.2.0.0/spark/logs/history-server
            spark.history.ui.port       18080
            spark.akka.frameSize       128
            spark.local.dirs /disk2/local,/disk3/local,/disk4/local,/disk5/local,/disk6/local,/disk7/local,/disk8/local,/disk9/local,/disk10/local,/disk11/local,/    disk12/local
            spark.network.timeout 1000s
            
            dusenberrymw Mike Dusenberry added a comment - Thanks, mboehm7 . I've updated the config settings, and am running another set of jobs. I do have a question regarding (5). I'm not running any executors on the driver node, but that driver node is acting as the HDFS Namenode, is additionally hosting the Spark history server (I meant to add that I have spark.eventLog.enabled set to true), and has an Ambari metrics monitor as well. Are any of those enough to cause scheduling delays? Here's the full, updated spark-defaults.sh file: spark.driver.memory 80g spark.executor.memory 118g spark.driver.extraJavaOptions -server spark.executor.extraJavaOptions -server spark.driver.maxResultSize 0 spark.eventLog.enabled true spark.eventLog.dir hdfs: //spark-ml-node-1.fyre.ibm.com:8020/iop/apps/4.2.0.0/spark/logs/history-server spark.history.ui.port 18080 spark.akka.frameSize 128 spark.local.dirs /disk2/local,/disk3/local,/disk4/local,/disk5/local,/disk6/local,/disk7/local,/disk8/local,/disk9/local,/disk10/local,/disk11/local,/ disk12/local spark.network.timeout 1000s
            mboehm7 Matthias Boehm added a comment - - edited

            couple of comments:
            (1) please do not specify the default parallelism - taking the number of input partitions or total number of cores for parallelize (both are the defaults) usually works very well,
            (2) in my experiments, the simple configuration of 1 executor per node with large memory actually works the best (which has advantages in terms of resource consumption as, for example, broadcasts can be shared in memory)
            (3) over-committing CPU is usually a good idea too, so there is no need to spare a core for the OS.
            (4) since you are running standalone, you might want to explicitly specify 'spark local dirs'
            (5) make sure the driver runs on a single node without anything on it - I've seen huge scheduler delays (that affect the entire cluster), if an executor runs on the same node.
            (6) regarding the GC issue, keep in mind that is is NOT a OOM but simply an executed GC limit that indicates unnecessary object allocations, etc, please try to set the -Xmn to 10% of the max heap, if this does not help please change the GC limit.

            Apart from the configuration issue, it is certainly useful to also do another pass over the matrix to frame converter to eliminate any remaining inefficiencies.

            mboehm7 Matthias Boehm added a comment - - edited couple of comments: (1) please do not specify the default parallelism - taking the number of input partitions or total number of cores for parallelize (both are the defaults) usually works very well, (2) in my experiments, the simple configuration of 1 executor per node with large memory actually works the best (which has advantages in terms of resource consumption as, for example, broadcasts can be shared in memory) (3) over-committing CPU is usually a good idea too, so there is no need to spare a core for the OS. (4) since you are running standalone, you might want to explicitly specify 'spark local dirs' (5) make sure the driver runs on a single node without anything on it - I've seen huge scheduler delays (that affect the entire cluster), if an executor runs on the same node. (6) regarding the GC issue, keep in mind that is is NOT a OOM but simply an executed GC limit that indicates unnecessary object allocations, etc, please try to set the -Xmn to 10% of the max heap, if this does not help please change the GC limit. Apart from the configuration issue, it is certainly useful to also do another pass over the matrix to frame converter to eliminate any remaining inefficiencies.

            People

              mboehm7 Matthias Boehm
              dusenberrymw Mike Dusenberry
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: