Description
Spark throws an exception when the Streaming K-means algorithm trains on a windowed stream. The stream looks like following:
val trainingSet = ssc.textFileStream(TrainingDataSet).window(Seconds(30))...
The exception occurs when there is no new data in the stream. Here is an exception:
15/07/21 17:36:08 ERROR JobScheduler: Error running job streaming job 1437489368000 ms.0
java.lang.ArrayIndexOutOfBoundsException: 13
at org.apache.spark.mllib.clustering.StreamingKMeansModel$$anonfun$update$1.apply(StreamingKMeans.scala:105)
at org.apache.spark.mllib.clustering.StreamingKMeansModel$$anonfun$update$1.apply(StreamingKMeans.scala:102)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.mllib.clustering.StreamingKMeansModel.update(StreamingKMeans.scala:102)
at org.apache.spark.mllib.clustering.StreamingKMeans$$anonfun$trainOn$1.apply(StreamingKMeans.scala:235)
at org.apache.spark.mllib.clustering.StreamingKMeans$$anonfun$trainOn$1.apply(StreamingKMeans.scala:234)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
When the new data arrives the algorithm works as expected.