Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.3.4, 2.4.5, 3.0.0
-
None
Description
When we cache an RDD with replication > 1, Firstly the RDD block is cached locally on one of the BlockManager and then it is replicated to (replication-1) number of BlockManagers. While replicating a block, if replication fails on one of the peers, it is supposed to retry the replication on some other peer (based on "spark.storage.maxReplicationFailures" config). But currently this doesn't happen because of some issue.
Logs of 1 of the executor which is trying to replicate:
20/02/10 09:01:47 INFO Executor: Starting executor ID 1 on host wn11-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net . . . 20/02/10 09:06:45 INFO Executor: Running task 244.0 in stage 3.0 (TID 550) 20/02/10 09:06:45 DEBUG BlockManager: Getting local block rdd_13_244 20/02/10 09:06:45 DEBUG BlockManager: Block rdd_13_244 was not found 20/02/10 09:06:45 DEBUG BlockManager: Getting remote block rdd_13_244 20/02/10 09:06:45 DEBUG BlockManager: Block rdd_13_244 not found 20/02/10 09:06:46 INFO MemoryStore: Block rdd_13_244 stored as values in memory (estimated size 33.3 MB, free 44.2 MB) 20/02/10 09:06:46 DEBUG BlockManager: Told master about block rdd_13_244 20/02/10 09:06:46 DEBUG BlockManager: Put block rdd_13_244 locally took 947 ms 20/02/10 09:06:46 DEBUG BlockManager: Level for block rdd_13_244 is StorageLevel(memory, deserialized, 3 replicas) 20/02/10 09:06:46 TRACE BlockManager: Trying to replicate rdd_13_244 of 34908552 bytes to BlockManagerId(2, wn10-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36711, None) 20/02/10 09:06:47 TRACE BlockManager: Replicated rdd_13_244 of 34908552 bytes to BlockManagerId(2, wn10-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36711, None) in 205.849858 ms 20/02/10 09:06:47 TRACE BlockManager: Trying to replicate rdd_13_244 of 34908552 bytes to BlockManagerId(5, wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None) 20/02/10 09:06:47 TRACE BlockManager: Replicated rdd_13_244 of 34908552 bytes to BlockManagerId(5, wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None) in 180.501504 ms 20/02/10 09:06:47 DEBUG BlockManager: Replicating rdd_13_244 of 34908552 bytes to 2 peer(s) took 387.381168 ms 20/02/10 09:06:47 DEBUG BlockManager: block rdd_13_244 replicated to BlockManagerId(5, wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None), BlockManagerId(2, wn10-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36711, None) 20/02/10 09:06:47 DEBUG BlockManager: Put block rdd_13_244 remotely took 423 ms 20/02/10 09:06:47 DEBUG BlockManager: Putting block rdd_13_244 with replication took 1371 ms 20/02/10 09:06:47 DEBUG BlockManager: Getting local block rdd_13_244 20/02/10 09:06:47 DEBUG BlockManager: Level for block rdd_13_244 is StorageLevel(memory, deserialized, 3 replicas) 20/02/10 09:06:47 INFO Executor: Finished task 244.0 in stage 3.0 (TID 550). 2253 bytes result sent to driver
Logs of other executor where the block is being replicated to:
20/02/10 09:01:47 INFO Executor: Starting executor ID 5 on host wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net . . . 20/02/10 09:06:47 INFO MemoryStore: Will not store rdd_13_244 20/02/10 09:06:47 WARN MemoryStore: Not enough space to cache rdd_13_244 in memory! (computed 4.2 MB so far) 20/02/10 09:06:47 INFO MemoryStore: Memory use = 4.9 GB (blocks) + 7.3 MB (scratch space shared across 2 tasks(s)) = 4.9 GB. Storage limit = 4.9 GB. 20/02/10 09:06:47 DEBUG BlockManager: Put block rdd_13_244 locally took 12 ms 20/02/10 09:06:47 WARN BlockManager: Block rdd_13_244 could not be removed as it was not found on disk or in memory 20/02/10 09:06:47 WARN BlockManager: Putting block rdd_13_244 failed 20/02/10 09:06:47 DEBUG BlockManager: Putting block rdd_13_244 without replication took 13 ms
Note here that the block replication failed in Executor-5 with log line "Not enough space to cache rdd_13_244 in memory!". But Executor-1 shows that block is successfully replicated to executor-5 - "Replicated rdd_13_244 of 34908552 bytes to BlockManagerId(5, wn2-prakha.mvqvy0u1catevlxn5wwhjss34f.bx.internal.cloudapp.net, 36463, None)" and so it never retries the replication on some other executor.
Sample code:
sc.setLogLevel("INFO") def randomString(length: Int) = { val r = new scala.util.Random val sb = new StringBuilder for (i <- 1 to length) \{ sb.append(r.nextPrintableChar) } sb.toString } val df = sc.parallelize(1 to 300000, 300).map\{x => randomString(100000)}.toDF import org.apache.spark.storage.StorageLevel df.persist(StorageLevel(false, true, false, true, 3)) df.count()
Attachments
Issue Links
- links to