Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
SystemML 0.11
-
None
Description
MLContext currently always assumes data frame to frame conversion without existing index column. Since the user cannot communicate the existence of this column, the data conversion leads to incorrect results as an additional column is included in the output frame. We need make the MLContext handling of frames consistent with the handling of matrices.
Additionally, the conversion code in MLContextConversionUtil.dataFrameToFrameObject() does not yet take into account frames with vectors, although the recent addition adds this support in the underlying FrameRDDConverterUtils.java class. Therefore, the number of columns set when mc == null is incorrect.
Thanks mwdusenb@us.ibm.com for catching this issue. cc acs_s deron
Attachments
Activity
Additionally, after some hacking to get past the above issue temporarily, I'm seeing
The given InputInfo is not implemented for ReblockSPInstruction:org.apache.sysml.runtime.matrix.data.InputInfo@5c4fe28a, at org.apache.sysml.runtime.instructions.spark.ReblockSPInstruction.processFrameReblockInstruction(ReblockSPInstruction.java:267)
Update: Disabling the RewriteBlockSizeAndReblock rewrite in ProgramRewriter temporarily fixes this Reblock issue.
Also, the other temporary update I did to get past the vector conversion was to replace MLContextConversionUtil.dataFrameToFrameObject with the following:
public static FrameObject dataFrameToFrameObject(String variableName, DataFrame dataFrame, FrameMetadata frameMetadata) { try { if (frameMetadata == null) { frameMetadata = new FrameMetadata(); } JavaSparkContext javaSparkContext = MLContextUtil .getJavaSparkContext((MLContext) MLContextProxy.getActiveMLContextForAPI()); boolean containsID = true; //isDataFrameWithIDColumn(frameMetadata); MatrixCharacteristics mc = frameMetadata.asMatrixCharacteristics(); if (mc == null) { mc = new MatrixCharacteristics(); // long rows = dataFrame.count(); // int cols = dataFrame.columns().length - ((containsID)?1:0); // mc.setDimension(rows, cols); int colVect = -1; //FrameRDDConverterUtils.getColVectFromDFSchema(dataFrame.schema(), containsID); int off = containsID ? 1 : 0; for( int i=off; i<dataFrame.schema().fields().length; i++ ) { StructField structType = dataFrame.schema().apply(i); if(structType.dataType() instanceof VectorUDT) colVect = i-off; } long rlen = dataFrame.count(); long clen = dataFrame.columns().length - off + ((colVect >= 0) ? ((Vector)dataFrame.first().get(off+colVect)).size() - 1 : 0); mc.set(rlen, clen, mc.getRowsPerBlock(), mc.getColsPerBlock(), -1); frameMetadata.setMatrixCharacteristics(mc); } String[] colnames = new String[(int)mc.getCols()]; ValueType[] fschema = new ValueType[(int)mc.getCols()]; FrameRDDConverterUtils.convertDFSchemaToFrameSchema(dataFrame.schema(), colnames, fschema, containsID); frameMetadata.setFrameSchema(new FrameSchema(Arrays.asList(fschema))); JavaPairRDD<Long, FrameBlock> binaryBlock = FrameRDDConverterUtils.dataFrameToBinaryBlock(javaSparkContext, dataFrame, mc, containsID); return MLContextConversionUtil.binaryBlocksToFrameObject(variableName, binaryBlock, frameMetadata); } catch (DMLRuntimeException e) { throw new MLContextException("Exception converting DataFrame to FrameObject", e); } }
Also, disabling the reblock rewrite above lead to issues on passing in SystemML matrix objects from one script to another with MLContext due to the matrix metadata output info being set to OutputInfo.BinaryCellOutputInfo instead of OutputInfo.BinaryBlockOutputInfo.
Additionally, I'm running into null pointer exceptions regarding cache blocks during Spark mapmm operations that I believe could be related to above mentioned block output info issue. Unfortunately I'll have to wait for this job to run again in order to grab the stacktrace.
ok, I now have patches for (1) the frame meta data handling (support for DF and DF_WITH_INDEX in FrameFormat and correct matrix characteristics updates), as well as (2) reblock handling (frames just inherited a bug from the old MLContext which was solved via a HACK at runtime level for matrices - anyway this is now fixed for both matrices and frames). Note that we don't need to explicitly include vectors into the FrameFormat as we anyway allow mixed schemas where one among many columns can be a vector.
cc niketanpansare please correct me if I'm wrong regarding the unnecessary reblock handling of equal blocksizes for matrices.
mboehm7 Great, eagerly looking forward to making use of these updates! Thanks!
Also, just to follow up with that null pointer exception, here's the stacktrace:
Caused by: org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in program block generated from statement block between lines 146 and 156 -- Error evaluating instruction: SPARK°mapmm°_mVar1312·MATRIX·DOUBLE°W·MATRIX·DOUBLE°_mVar1313·MATRIX·DOUBLE°RIGHT°false°MULTI_BLOCK at org.apache.sysml.runtime.controlprogram.ProgramBlock.executeSingleInstruction(ProgramBlock.java:335) at org.apache.sysml.runtime.controlprogram.ProgramBlock.executeInstructions(ProgramBlock.java:224) at org.apache.sysml.runtime.controlprogram.ProgramBlock.execute(ProgramBlock.java:168) at org.apache.sysml.runtime.controlprogram.ForProgramBlock.execute(ForProgramBlock.java:150) ... 29 more Caused by: java.lang.NullPointerException at org.apache.sysml.runtime.controlprogram.caching.CacheBlockFactory.getCode(CacheBlockFactory.java:59) at org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock.writeHeaderAndPayload(PartitionedBlock.java:355) at org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock.writeExternal(PartitionedBlock.java:332) at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1326) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:639) at org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext.getBroadcastForVariable(SparkExecutionContext.java:536) at org.apache.sysml.runtime.instructions.spark.MapmmSPInstruction.processInstruction(MapmmSPInstruction.java:122) at org.apache.sysml.runtime.controlprogram.ProgramBlock.executeSingleInstruction(ProgramBlock.java:305) ... 32 more
Okay, I tried out the latest update, and there is still one more issue to address. If MLContextConversionUtil.dataFrameToFrameObject receives a DataFrame without any frame metadata, a new FrameMetadata} will be created with an empty {{FrameFormat, and so the subsequent isDataFrameWithIDColumn function will always return false (line 360 ). We should just create a new function similar to determineMatrixFormatIfNeeded for frames, and call it before the isDataFrameWithIDColumn function, as is done for DataFrame-matrix conversions (line 412 ).
I can add this method.
yes, that's true we should do the same as for matrices here - thanks for taking it over.
Apart, from that there are some smaller remaining issues: (1) input handling with read statements (inputs not found), and (2) column name schema pass-through, which is primarily needed for transform as users are free to provide a transform specification based on column names instead of column indexes.
Alright I fixed that issue, but now I'm still running into the reblock issue:
Script:
n = nrow(trainf) d = ncol(trainf)
Error:
Caused by: org.apache.sysml.runtime.DMLRuntimeException: ERROR: Runtime error in program block generated from statement block between lines 1 and 7 -- Error evaluating instruction: SPARK°rblk°trainf·FRAME·DOUBLE·false°_fVar5·FRAME·DOUBLE°1000°1000°true at org.apache.sysml.runtime.controlprogram.ProgramBlock.executeSingleInstruction(ProgramBlock.java:335) at org.apache.sysml.runtime.controlprogram.ProgramBlock.executeInstructions(ProgramBlock.java:224) at org.apache.sysml.runtime.controlprogram.ProgramBlock.execute(ProgramBlock.java:168) at org.apache.sysml.runtime.controlprogram.Program.execute(Program.java:145) ... 15 more Caused by: org.apache.sysml.runtime.DMLRuntimeException: The given InputInfo is not implemented for ReblockSPInstruction: binaryblock at org.apache.sysml.runtime.instructions.spark.ReblockSPInstruction.processFrameReblockInstruction(ReblockSPInstruction.java:251) at org.apache.sysml.runtime.instructions.spark.ReblockSPInstruction.processInstruction(ReblockSPInstruction.java:115)
I would suspect that the meta data is wrong - could you please have a look at RewriteRemovePersistentReadWrite and see if the automatically created matrix characteristics have correct blocksizes of 1k x 1k? You might also want to take a look at DataFrameVectorScriptTest, where we test exactly that.
independent of the potential meta data issues, this is nevertheless a bug SYSTEMML-1002 as we should never (neither in mlcontext nor batch) inject reblocks for binary block frames.
Yeah here was the explain output:
--MAIN PROGRAM ----GENERIC (lines 1-7) [recompile=true] ------CP createvar _fVar9 scratch_space//_p21642_9.30.110.134//_t0/temp7 true FRAME binaryblock -1 -1 1000 1000 -1 copy ------SPARK rblk trainf.FRAME.DOUBLE.false _fVar9.FRAME.DOUBLE 1000 1000 true ------CP createvar _fVar10 scratch_space//_p21642_9.30.110.134//_t0/temp8 true FRAME binaryblock -1 -1 1000 1000 -1 copy ------SPARK chkpoint _fVar9.FRAME.DOUBLE.false _fVar10.FRAME.DOUBLE MEMORY_AND_DISK ------CP rmvar _fVar9 ------CP nrow _fVar10.FRAME.DOUBLE.false _Var11.SCALAR.INT ------CP ncol _fVar10.FRAME.DOUBLE.false _Var12.SCALAR.INT ------CP rmvar _fVar10 ------CP assignvar _Var11.SCALAR.INT.false n.SCALAR.INT ------CP assignvar _Var12.SCALAR.INT.false d.SCALAR.INT ------CP rmvar _Var11 ------CP rmvar _Var12
Indeed, the blocksizes were set to the default "1" when the metadata is missing/empty. Like we do for dataframe-matrix conversions, if I add the following to FrameRDDConverterUtils.dataFrameToBinaryBlock it fixes the issue on my cluster. However, while I can see the incorrect blocksizes while debugging in a new local test, I'm unable to get it to actually fail locally. Perhaps another rewrite is removing the reblock command locally?
//ensure valid blocksizes if( mc.getRowsPerBlock()<=1 || mc.getColsPerBlock()<=1 ) { mc.setBlockSize(ConfigurationManager.getBlocksize()); }
just to demystify this behavior: (1) the reblock was injected because the script is compiled without input characteristics (and hence with default textcell input format), (2) with (wrong) default blocksize of 1, the reblock was not removed (now it is always removed for frames), and (3) the reason why it did not fail locally was our "in-memory reblock" which simply read the input into memory (which of course supports all the formats).
Okay great, this overall issue has been resolved now. Thanks for the explanation!
Also, the conversion code in MLContextConversionUtil.dataFrameToFrameObject() does not yet take into account frames with vectors, although the recent addition adds this support in the underlying FrameRDDConverterUtils.java class. Therefore, the number of columns set when mc == null is incorrect.