Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Duplicate
-
2.0.0
-
None
-
None
Description
Write ahead log seems to get corrupted when the application is stopped abruptly (Ctrl-C, or kill). Then, the application refuses to run due to this exception:
2016-10-03 08:03:32,321 ERROR [Executor task launch worker-1] executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1) com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 ...skipping... at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Code:
import org.apache.hadoop.conf.Configuration import org.apache.spark._ import org.apache.spark.streaming._ object ProtoDemo { def createContext(dirName: String) = { val conf = new SparkConf().setAppName("mything").setMaster("local[4]") conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") /* conf.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true") conf.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true") */ val ssc = new StreamingContext(conf, Seconds(1)) ssc.checkpoint(dirName) val lines = ssc.socketTextStream("127.0.0.1", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) val runningCounts = wordCounts.updateStateByKey[Int] { (values: Seq[Int], oldValue: Option[Int]) => val s = values.sum Some(oldValue.fold(s)(_ + s)) } // Print the first ten elements of each RDD generated in this DStream to the console runningCounts.print() ssc } def main(args: Array[String]) = { val hadoopConf = new Configuration() val dirName = "/tmp/chkp" val ssc = StreamingContext.getOrCreate(dirName, () => createContext(dirName), hadoopConf) ssc.start() ssc.awaitTermination() } }
Steps to reproduce:
1. I put the code in a repository: git clone https://github.com/thesamet/spark-issue
2. in one terminal: {{ while true; do nc -l localhost 9999; done}}
3. Start a new terminal
4. Run "sbt run".
5. Type a few lines in the netcat terminal.
6. Kill the streaming project (Ctrl-C),
7. Go back to step 4 until you see the exception above.
I tried the above with local filesystem and also with S3, and getting the same result.
Attachments
Issue Links
- duplicates
-
SPARK-18617 Close "kryo auto pick" feature for Spark Streaming
- Resolved