Details
Description
- Ran a comparatively large job (temp table creation) at 10 TB scale.
- Turned on intermediate mem-to-mem (tez.runtime.shuffle.memory-to-memory.enable=true and tez.runtime.shuffle.memory-to-memory.segments=4)
- Some reducers get lots of data and quickly gets into infinite loop
2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2] orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned Status.WAIT ... 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2] orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 3ms 2015-01-07 02:36:56,644 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true sent hash and receievd reply 0 ms 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2] orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned Status.WAIT ... 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2] orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 1ms 2015-01-07 02:36:56,645 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true sent hash and receievd reply 0 ms 2015-01-07 02:36:56,647 INFO [fetcher [Map_1] #2] orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned Status.WAIT ... 2015-01-07 02:36:56,647 INFO [fetcher [Map_1] #2] orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 2ms 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true sent hash and receievd reply 0 ms 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2] orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned Status.WAIT ... 2015-01-07 02:36:56,653 INFO [fetcher [Map_1] #2] orderedgrouped.ShuffleScheduler: m1:13562 freed by fetcher [Map_1] #2 in 5ms 2015-01-07 02:36:56,654 INFO [fetcher [Map_1] #2] shuffle.HttpConnection: for url=http://m1:13562/mapOutput?job=job_1420000126204_0201&reduce=34&map=attempt_1420000126204_0201_1_00_000420_0_10027&keepAlive=true sent hash and receievd reply 0 ms 2015-01-07 02:36:56,654 INFO [fetcher [Map_1] #2] orderedgrouped.FetcherOrderedGrouped: fetcher#2 - MergerManager returned Status.WAIT ...
Additional debug/patch statements revealed that InMemoryMerge is not invoked appropriately and not releasing the memory back for fetchers to proceed. e.g debug/patch messages are given below
syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:48,332 INFO [fetcher [Map_1] #2] orderedgrouped.MergeManager: Patch..usedMemory=1551867234, memoryLimit=1073741824, commitMemory=883028388, mergeThreshold=708669632 <<=== InMemoryMerge would be started in this case as commitMemory >= mergeThreshold syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:52,900 INFO [fetcher [Map_1] #2] orderedgrouped.MergeManager: Patch..usedMemory=1273349784, memoryLimit=1073741824, commitMemory=347296632, mergeThreshold=708669632 <<=== InMemoryMerge would *NOT* be started in this case as commitMemory < mergeThreshold. But the usedMemory is higher than memoryLimit. Fetchers would keep waiting indefinitely until memory is released. InMemoryMerge will not kick in and not release memory. syslog_attempt_1420000126204_0201_1_01_000034_0:2015-01-07 02:05:53,163 INFO [fetcher [Map_1] #1] orderedgrouped.MergeManager: Patch..usedMemory=1191994052, memoryLimit=1073741824, commitMemory=523155206, mergeThreshold=708669632 <<=== InMemoryMerge would *NOT* be started in this case as commitMemory < mergeThreshold. But the usedMemory is higher than memoryLimit. Fetchers would keep waiting indefinitely until memory is released. InMemoryMerge will not kick in and not release memory.
In MergeManager, in memory merging is invoked under the following condition
if (!inMemoryMerger.isInProgress() && commitMemory >= mergeThreshold)
Attaching the sample hive command just for reference
$HIVE_HOME/bin/hive -hiveconf tez.runtime.io.sort.factor=200 --hiveconf hive.tez.auto.reducer.parallelism=false --hiveconf tez.am.heartbeat.interval-ms.max=20 --hiveconf tez.runtime.io.sort.mb=1200 --hiveconf tez.runtime.sort.threads=2 --hiveconf hive.tez.container.size=4096 --hiveconf tez.runtime.shuffle.memory-to-memory.enable=true --hiveconf tez.runtime.shuffle.memory-to-memory.segments=4 create table testData as select ss_sold_date_sk,ss_sold_time_sk,ss_item_sk,ss_customer_sk,ss_quantity,ss_sold_date from store_sales distribute by ss_sold_date;