Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
2.0.0, 2.0.1
Description
We have hit a consistent bug where we have a dataset with more than 100 columns. I am raising as a blocker because spark is returning the WRONG results rather than erroring, leading to data integrity issues
I have put together the following test case which will show the issue (it will run in spark-shell). In this example i am joining a dataset with lots of fields onto another dataset.
The join works fine and if you show the dataset you will get the expected result. However if you run a map step over the dataset you end up with a strange error where the sequence that is in the right dataset now only contains the last value.
Whilst this test may seem a rather contrived example, what we are doing here is a very standard analtical pattern. My original code was designed to:
- take a dataset of child records
- groupByKey up to the parent: giving a Dataset of (ParentID, Seq[Children])
- join the children onto the parent by parentID: giving ((Parent),(ParentID,Seq[Children])
- map over the result to give a tuple of (Parent,Seq[Children])
Notes:
- The issue is resolved by having less fields - as soon as we go <= 100 the integrity issue goes away. Try removing one of the fields from BigCaseClass below
- The issue will arise based on the total number of fields in the resulting dataset. Below i have a small case class and a big case class, but two case classes of 50 variable would give the same issue
- the issue occurs where the case class being joined on (on the right) has a case class type. It doesnt occur if you have a Seq[String]
- If i go back to an RDD for the map step after the join i can workaround the issue, but i lose all the benefits of datasets
Scala code test case:
case class Name(name: String)
case class SmallCaseClass (joinkey: Integer, names: Seq[Name])
case class BigCaseClass (field1: Integer,field2: Integer,field3: Integer,field4: Integer,field5: Integer,field6: Integer,field7: Integer,field8: Integer,field9: Integer,field10: Integer,field11: Integer,field12: Integer,field13: Integer,field14: Integer,field15: Integer,field16: Integer,field17: Integer,field18: Integer,field19: Integer,field20: Integer,field21: Integer,field22: Integer,field23: Integer,field24: Integer,field25: Integer,field26: Integer,field27: Integer,field28: Integer,field29: Integer,field30: Integer,field31: Integer,field32: Integer,field33: Integer,field34: Integer,field35: Integer,field36: Integer,field37: Integer,field38: Integer,field39: Integer,field40: Integer,field41: Integer,field42: Integer,field43: Integer,field44: Integer,field45: Integer,field46: Integer,field47: Integer,field48: Integer,field49: Integer,field50: Integer,field51: Integer,field52: Integer,field53: Integer,field54: Integer,field55: Integer,field56: Integer,field57: Integer,field58: Integer,field59: Integer,field60: Integer,field61: Integer,field62: Integer,field63: Integer,field64: Integer,field65: Integer,field66: Integer,field67: Integer,field68: Integer,field69: Integer,field70: Integer,field71: Integer,field72: Integer,field73: Integer,field74: Integer,field75: Integer,field76: Integer,field77: Integer,field78: Integer,field79: Integer,field80: Integer,field81: Integer,field82: Integer,field83: Integer,field84: Integer,field85: Integer,field86: Integer,field87: Integer,field88: Integer,field89: Integer,field90: Integer,field91: Integer,field92: Integer,field93: Integer,field94: Integer,field95: Integer,field96: Integer,field97: Integer,field98: Integer,field99: Integer)
val bigCC=Seq(BigCaseClass(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99))
val smallCC=Seq(SmallCaseClass(1,Seq(
Name("Jamie"),
Name("Ian"),
Name("Dave"),
Name("Will")
)))
val bigCCDS = spark.createDataset(spark.sparkContext.parallelize(bigCC))
val smallCCDS = spark.createDataset(spark.sparkContext.parallelize(smallCC))
val joined_test=bigCCDS.as("A").joinWith(smallCCDS.as("B"), $"A.field1"===$"B.joinkey", "LEFT")
/*This next step is fine - it shows all 4 names:
- [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99]
- [1,WrappedArray([Jamie], [Ian], [Dave], [Will])]
- */
joined_test.show(false)
/*This one ends up repeating will - I did the most simple map step possible here
- [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99]
- [1,WrappedArray([Will], [Will], [Will], [Will])]
- */
joined_test.map(identity).show(false)
/*This one works because we have less than 100 fields:
- [Jamie], [Ian], [Dave], [Will]*/
joined_test.map(_._2).show(false)
Attachments
Issue Links
- duplicates
-
SPARK-16664 Spark 1.6.2 - Persist call on Data frames with more than 200 columns is wiping out the data.
- Resolved
- is related to
-
SPARK-17093 Roundtrip encoding of array<struct<>> fields is wrong when whole-stage codegen is disabled
- Resolved
- links to