Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
SystemDS 2.2
-
None
Description
Any left matrix multiplication using spark, take all blocks and construct a single matrixBlock to then slice and distribute, making the multiplication in compressed space not applicable.
and make normal execution ineffecient.
This task is to fix this indirection by directly using the matrix blocks already allocated in RDD, by recieving them, and not unifying them into a single block, but instead use the partitions directly.
Code can be found in: src/main/java/org/apache/sysds/runtime/instructions/spark/MapmmSPInstruction.java
and on line 129 in that file, the right side of the mm, is partitioned for broadcasting.
In this a PatitionedBroadcast is constructed, that decompress the individual compressed blocks, because of a call to acquireRead.
Change this acuireRead to something better.
(this only happens if the compressedside is smaller than the uncompressed side.)
Currenly the test verify behaviour:
src/test/java/org/apache/sysds/test/functions/compress/configuration/CompressForce.java : testMatrixMultLeftSum_SP_SmallerThanLeft
x = read($A) v = rand(rows=10, cols=nrow(x), min=0.0, max=1.0, seed= 13); r = v %*% x s = sum(r) print(toString(r)) print(s)
bellow is a trace where i disabled decompression by inserting a not implemented on decompress.
[ERROR] Failures: [ERROR] CompressForce.testMatrixMultLeftSum_SP:105->CompressBase.runTest:52->CompressBase.compressTest:70->AutomatedTestBase.runTest:1333->AutomatedTestBase.runTest:1337->AutomatedTestBase.runTest:1341->AutomatedTestBase.runTest:1446 failed to run script: ./src/test/scripts/functions/compress/force/compress.dml Standard Out: SystemDS Statistics: Total elapsed time: 1.441 sec. Total compilation time: 0.680 sec. Total execution time: 0.761 sec. Number of compiled Spark inst: 6. Number of executed Spark inst: 4. Cache hits (Mem/Li/WB/FS/HDFS): 0/0/0/0/0. Cache writes (Li/WB/FS/HDFS): 0/0/0/0. Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec. HOP DAGs recompiled (PRED, SB): 0/1. HOP DAGs recompile time: 0.007 sec. Spark ctx create time (lazy): 0.897 sec. Spark trans counts (par,bc,col):0/0/0. Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. CLA Compression Phases : 0.031/0.003/0.000/0.011/0.000/0.000 Decompression Counts (Single , Multi) thread : 0/0 Dedicated Decompression Time (Single , Multi) thread : 0.000/0.000 Total JIT compile time: 3.556 sec. Total JVM GC count: 7. Total JVM GC time: 0.055 sec. Heavy hitter instructions: # Instruction Time(s) Count 1 sp_rand 0.215 1 2 sp_rblk 0.189 1 3 createvar 0.021 6 4 sp_chkpoint 0.005 1 5 sp_compress 0.003 1 6 rmvar 0.000 2 StackTrace: LEVEL : 0 Exception : class org.apache.sysds.runtime.DMLRuntimeException Message : org.apache.sysds.runtime.DMLRuntimeException: ERROR: Runtime error in program block generated from statement block between lines 22 and 27 -- Error evaluating instruction: SPARK°mapmm°_mVar10·MATRIX·FP64°_mVar13·MATRIX·FP64°_mVar14·MATRIX·FP64°RIGHT°true°SINGLE_BLOCK 0 > org.apache.sysds.runtime.controlprogram.Program.execute(Program.java:160) 0 > org.apache.sysds.api.ScriptExecutorUtils.executeRuntimeProgram(ScriptExecutorUtils.java:89) 0 > org.apache.sysds.api.DMLScript.execute(DMLScript.java:426) 0 > org.apache.sysds.api.DMLScript.executeScript(DMLScript.java:269) 0 > org.apache.sysds.test.AutomatedTestBase.main(AutomatedTestBase.java:1464) 0 > org.apache.sysds.test.AutomatedTestBase.runTest(AutomatedTestBase.java:1420) 0 > org.apache.sysds.test.AutomatedTestBase.runTest(AutomatedTestBase.java:1341) 0 > org.apache.sysds.test.AutomatedTestBase.runTest(AutomatedTestBase.java:1337) 0 > org.apache.sysds.test.AutomatedTestBase.runTest(AutomatedTestBase.java:1333) 0 > org.apache.sysds.test.functions.compress.configuration.CompressBase.compressTest(CompressBase.java:70) 0 > org.apache.sysds.test.functions.compress.configuration.CompressBase.runTest(CompressBase.java:52) 0 > org.apache.sysds.test.functions.compress.configuration.CompressForce.testMatrixMultLeftSum_SP(CompressForce.java:105) 0 > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 0 > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 0 > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 0 > java.lang.reflect.Method.invoke(Method.java:498) > ... Stopping Stack Trace at JUnit LEVEL : 1 Exception : class org.apache.sysds.runtime.DMLRuntimeException Message : ERROR: Runtime error in program block generated from statement block between lines 22 and 27 -- Error evaluating instruction: SPARK°mapmm°_mVar10·MATRIX·FP64°_mVar13·MATRIX·FP64°_mVar14·MATRIX·FP64°RIGHT°true°SINGLE_BLOCK 1 > org.apache.sysds.runtime.controlprogram.ProgramBlock.executeSingleInstruction(ProgramBlock.java:286) 1 > org.apache.sysds.runtime.controlprogram.ProgramBlock.executeInstructions(ProgramBlock.java:199) 1 > org.apache.sysds.runtime.controlprogram.BasicProgramBlock.execute(BasicProgramBlock.java:125) 1 > org.apache.sysds.runtime.controlprogram.Program.execute(Program.java:154) 1 > org.apache.sysds.api.ScriptExecutorUtils.executeRuntimeProgram(ScriptExecutorUtils.java:89) 1 > org.apache.sysds.api.DMLScript.execute(DMLScript.java:426) 1 > org.apache.sysds.api.DMLScript.executeScript(DMLScript.java:269) 1 > org.apache.sysds.test.AutomatedTestBase.main(AutomatedTestBase.java:1464) 1 > org.apache.sysds.test.AutomatedTestBase.runTest(AutomatedTestBase.java:1420) 1 > org.apache.sysds.test.AutomatedTestBase.runTest(AutomatedTestBase.java:1341) 1 > org.apache.sysds.test.AutomatedTestBase.runTest(AutomatedTestBase.java:1337) 1 > org.apache.sysds.test.AutomatedTestBase.runTest(AutomatedTestBase.java:1333) 1 > org.apache.sysds.test.functions.compress.configuration.CompressBase.compressTest(CompressBase.java:70) 1 > org.apache.sysds.test.functions.compress.configuration.CompressBase.runTest(CompressBase.java:52) 1 > org.apache.sysds.test.functions.compress.configuration.CompressForce.testMatrixMultLeftSum_SP(CompressForce.java:105) 1 > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 1 > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 1 > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 1 > java.lang.reflect.Method.invoke(Method.java:498) > ... Stopping Stack Trace at JUnit LEVEL : 2 Exception : class org.apache.commons.lang.NotImplementedException Message : Not Valid 2 > org.apache.sysds.runtime.compress.CompressedMatrixBlock.decompress(CompressedMatrixBlock.java:245) 2 > org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext.toMatrixBlock(SparkExecutionContext.java:1060) 2 > org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext.toMatrixBlock(SparkExecutionContext.java:991) 2 > org.apache.sysds.runtime.controlprogram.caching.MatrixObject.readBlobFromRDD(MatrixObject.java:534) 2 > org.apache.sysds.runtime.controlprogram.caching.MatrixObject.readBlobFromRDD(MatrixObject.java:69) 2 > org.apache.sysds.runtime.controlprogram.caching.CacheableData.acquireReadIntern(CacheableData.java:549) 2 > org.apache.sysds.runtime.controlprogram.caching.CacheableData.acquireRead(CacheableData.java:478) 2 > org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext.getBroadcastForMatrixObject(SparkExecutionContext.java:652) 2 > org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext.getBroadcastForVariable(SparkExecutionContext.java:746) 2 > org.apache.sysds.runtime.instructions.spark.MapmmSPInstruction.processInstruction(MapmmSPInstruction.java:129) 2 > org.apache.sysds.runtime.controlprogram.ProgramBlock.executeSingleInstruction(ProgramBlock.java:250) 2 > org.apache.sysds.runtime.controlprogram.ProgramBlock.executeInstructions(ProgramBlock.java:199) 2 > org.apache.sysds.runtime.controlprogram.BasicProgramBlock.execute(BasicProgramBlock.java:125) 2 > org.apache.sysds.runtime.controlprogram.Program.execute(Program.java:154) 2 > org.apache.sysds.api.ScriptExecutorUtils.executeRuntimeProgram(ScriptExecutorUtils.java:89) 2 > org.apache.sysds.api.DMLScript.execute(DMLScript.java:426) 2 > org.apache.sysds.api.DMLScript.executeScript(DMLScript.java:269) 2 > org.apache.sysds.test.AutomatedTestBase.main(AutomatedTestBase.java:1464) 2 > org.apache.sysds.test.AutomatedTestBase.runTest(AutomatedTestBase.java:1420) 2 > org.apache.sysds.test.AutomatedTestBase.runTest(AutomatedTestBase.java:1341) 2 > org.apache.sysds.test.AutomatedTestBase.runTest(AutomatedTestBase.java:1337) 2 > org.apache.sysds.test.AutomatedTestBase.runTest(AutomatedTestBase.java:1333) 2 > org.apache.sysds.test.functions.compress.configuration.CompressBase.compressTest(CompressBase.java:70) 2 > org.apache.sysds.test.functions.compress.configuration.CompressBase.runTest(CompressBase.java:52) 2 > org.apache.sysds.test.functions.compress.configuration.CompressForce.testMatrixMultLeftSum_SP(CompressForce.java:105) 2 > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 2 > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 2 > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 2 > java.lang.reflect.Method.invoke(Method.java:498) > ... Stopping Stack Trace at JUnit