Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
-
Linux
Description
Mapper, Reducer need an occasion to do some cleanup after the last record is processed.
Proposal (patch attached)
in interface Mapper:
add method void finished();
in interface Reducer:
add method void finished();
finished() methods are called from MapTask, CombiningCollector, ReduceTask.
------------
Known limitation: Fetcher (a multithreaded MapRunnable) does not call finished().
This is not currently a problem bec. fetcher Map/Reduce modules do not do anything in finished().
The right way to add finished() support to Fetcher would be to wait for all threads to finish,
then do:
if (collector instanceof CombiningCollector) ((CombiningCollector)collector).finished();
------------
patch begins: (svn trunk)
Index: src/test/org/apache/nutch/mapred/MapredLoadTest.java
===================================================================
— src/test/org/apache/nutch/mapred/MapredLoadTest.java (revision 374781)
+++ src/test/org/apache/nutch/mapred/MapredLoadTest.java (working copy)
@@ -69,6 +69,8 @@
out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
}
}
+ public void finished()
}
static class RandomGenReducer implements Reducer {
public void configure(JobConf job) { @@ -81,6 +83,8 @@ out.collect(new UTF8("" + val), new UTF8("")); }
}
+ public void finished() {+ }
}
static class RandomCheckMapper implements Mapper {
public void configure(JobConf job)
+ public void finished()
{ + }}
static class RandomCheckReducer implements Reducer {
public void configure(JobConf job) { @@ -106,6 +112,8 @@ }
out.collect(new IntWritable(keyint), new IntWritable(count));
}
+ public void finished() {+ }
}
int range;
Index: src/test/org/apache/nutch/fs/TestNutchFileSystem.java
===================================================================
— src/test/org/apache/nutch/fs/TestNutchFileSystem.java (revision 374783)
+++ src/test/org/apache/nutch/fs/TestNutchFileSystem.java (working copy)
@@ -155,6 +155,8 @@
reporter.setStatus("wrote " + name);
}
+
+ public void finished() {}
}
public static void writeTest(NutchFileSystem fs, boolean fastCheck)
@@ -247,6 +249,9 @@
reporter.setStatus("read " + name);
}
+
+ public void finished() {}
+
}
public static void readTest(NutchFileSystem fs, boolean fastCheck)
@@ -339,6 +344,9 @@
in.close();
}
}
+
+ public void finished() {}
+
}
public static void seekTest(NutchFileSystem fs, boolean fastCheck)
Index: src/java/org/apache/nutch/indexer/DeleteDuplicates.java
===================================================================
— src/java/org/apache/nutch/indexer/DeleteDuplicates.java (revision 374776)
+++ src/java/org/apache/nutch/indexer/DeleteDuplicates.java (working copy)
@@ -225,6 +225,7 @@
}
}
}
+ public void finished() {}
}
private NutchFileSystem fs;
@@ -265,6 +266,8 @@
reader.close();
}
}
+
+ public void finished() {}
/** Write nothing. */
public RecordWriter getRecordWriter(final NutchFileSystem fs,
Index: src/java/org/apache/nutch/indexer/Indexer.java
===================================================================
— src/java/org/apache/nutch/indexer/Indexer.java (revision 374778)
+++ src/java/org/apache/nutch/indexer/Indexer.java (working copy)
@@ -227,6 +227,8 @@
output.collect(key, new ObjectWritable(doc));
}
+
+ public void finished() {}
public void index(File indexDir, File crawlDb, File linkDb, File[] segments)
throws IOException
output.collect(key, new ObjectWritable(dump.toString()));
}
-
+
+ public void finished() {}
+
public void reader(File segment) throws IOException {
LOG.info("Reader: segment: " + segment);
Index: src/java/org/apache/nutch/mapred/Mapper.java
===================================================================
— src/java/org/apache/nutch/mapred/Mapper.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/Mapper.java (working copy)
@@ -39,4 +39,9 @@
void map(WritableComparable key, Writable value,
OutputCollector output, Reporter reporter)
throws IOException;
+
+ /** Called after the last
call on this Mapper object.
+ Typical implementations do nothing.
+ */
+ void finished();
}
Index: src/java/org/apache/nutch/mapred/lib/RegexMapper.java
===================================================================
— src/java/org/apache/nutch/mapred/lib/RegexMapper.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/lib/RegexMapper.java (working copy)
@@ -53,4 +53,5 @@
output.collect(new UTF8(matcher.group(group)), new LongWritable(1));
}
}
+ public void finished() {}
}
Index: src/java/org/apache/nutch/mapred/lib/InverseMapper.java
===================================================================
— src/java/org/apache/nutch/mapred/lib/InverseMapper.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/lib/InverseMapper.java (working copy)
@@ -38,4 +38,6 @@
throws IOException
+
+ public void finished() {}
}
Index: src/java/org/apache/nutch/mapred/lib/IdentityReducer.java
===================================================================
— src/java/org/apache/nutch/mapred/lib/IdentityReducer.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/lib/IdentityReducer.java (working copy)
@@ -42,4 +42,5 @@
}
}
+ public void finished() {}
}
Index: src/java/org/apache/nutch/mapred/lib/IdentityMapper.java
===================================================================
— src/java/org/apache/nutch/mapred/lib/IdentityMapper.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/lib/IdentityMapper.java (working copy)
@@ -39,4 +39,5 @@
output.collect(key, val);
}
+ public void finished() {}
}
Index: src/java/org/apache/nutch/mapred/lib/LongSumReducer.java
===================================================================
— src/java/org/apache/nutch/mapred/lib/LongSumReducer.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/lib/LongSumReducer.java (working copy)
@@ -47,4 +47,6 @@
// output sum
output.collect(key, new LongWritable(sum));
}
+
+ public void finished() {}
}
Index: src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java
===================================================================
— src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java (working copy)
@@ -50,4 +50,6 @@
output.collect(new UTF8(st.nextToken()), new LongWritable(1));
}
}
+
+ public void finished() {}
}
Index: src/java/org/apache/nutch/mapred/ReduceTask.java
===================================================================
— src/java/org/apache/nutch/mapred/ReduceTask.java (revision 374781)
+++ src/java/org/apache/nutch/mapred/ReduceTask.java (working copy)
@@ -275,6 +275,7 @@
}
} finally {
+ reducer.finished();
in.close();
lfs.delete(new File(sortedFile)); // remove sorted
out.close(reporter);
Index: src/java/org/apache/nutch/mapred/MapTask.java
===================================================================
— src/java/org/apache/nutch/mapred/MapTask.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/MapTask.java (working copy)
@@ -50,7 +50,7 @@
public void write(DataOutput out) throws IOException
public void readFields(DataInput in) throws IOException
{ super.readFields(in); @@ -126,6 +126,10 @@ } } finally {
+ if (combining)
+
in.close(); // close input
}
} finally {
@@ -147,5 +151,5 @@
public NutchConf getConf()
+
}
Index: src/java/org/apache/nutch/mapred/MapRunner.java
===================================================================-
- src/java/org/apache/nutch/mapred/MapRunner.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/MapRunner.java (working copy)
@@ -38,18 +38,22 @@
public void run(RecordReader input, OutputCollector output,
Reporter reporter)
throws IOException {
- src/java/org/apache/nutch/mapred/MapRunner.java (revision 374737)
-
- while (true) {
- // allocate new key & value instances
- WritableComparable key =
- (WritableComparable)job.newInstance(inputKeyClass);
- Writable value = (Writable)job.newInstance(inputValueClass);
+ tryUnknown macro: {+ while (true) { + // allocate new key & value instances + WritableComparable key = + (WritableComparable)job.newInstance(inputKeyClass); + Writable value = (Writable)job.newInstance(inputValueClass); - // read next key & value - if (!input.next(key, value)) - return; + // read next key & value + if (!input.next(key, value)) + return; - // map pair to output - mapper.map(key, value, output, reporter); + // map pair to output + mapper.map(key, value, output, reporter); + }+ }finally
{ + mapper.finished(); }}
Index: src/java/org/apache/nutch/mapred/CombiningCollector.java
===================================================================
— src/java/org/apache/nutch/mapred/CombiningCollector.java (revision 374780)
+++ src/java/org/apache/nutch/mapred/CombiningCollector.java (working copy)
@@ -78,4 +78,9 @@
count = 0;
}
+ public synchronized void finished()
+
+
}
Index: src/java/org/apache/nutch/mapred/Reducer.java
===================================================================
— src/java/org/apache/nutch/mapred/Reducer.java (revision 374737)
+++ src/java/org/apache/nutch/mapred/Reducer.java (working copy)
@@ -38,4 +38,10 @@
void reduce(WritableComparable key, Iterator values,
OutputCollector output, Reporter reporter)
throws IOException;
+
+ /** Called after the last
call on this Reducer object.
+ Typical implementations do nothing.
+ */
+ void finished();
+
}
Index: src/java/org/apache/nutch/crawl/CrawlDbReader.java
===================================================================
— src/java/org/apache/nutch/crawl/CrawlDbReader.java (revision 374737)
+++ src/java/org/apache/nutch/crawl/CrawlDbReader.java (working copy)
@@ -50,9 +50,9 @@
/**
- Read utility for the CrawlDB.
- *
+ *
- @author Andrzej Bialecki
- *
+ *
*/
public class CrawlDbReader { @@ -68,6 +68,7 @@ output.collect(new UTF8("retry"), new LongWritable(cd.getRetriesSinceFetch())); output.collect(new UTF8("score"), new LongWritable((long) (cd.getScore() * 1000.0))); }+ public void finished() {}
}
public static class CrawlDbStatReducer implements Reducer
{ @@ -121,6 +122,7 @@ output.collect(new UTF8("avg score"), new LongWritable(total / cnt)); } }
+ public void finished() {}
}
public static class CrawlDbDumpReducer implements Reducer {
@@ -133,8 +135,11 @@
public void configure(JobConf job) {
}
+
+ public void finished()
}
+
{ LOG.info("CrawlDb statistics start: " + crawlDb); File tmpFolder = new File(crawlDb, "stat_tmp" + System.currentTimeMillis()); @@ -219,7 +224,7 @@ System.out.println("not found"); }
public void processStatJob(String crawlDb, NutchConf config) throws IOException}
+
{ LOG.info("CrawlDb dump: starting"); @@ -270,4 +275,5 @@ }
public void processDumpJob(String crawlDb, String output, NutchConf config) throws IOExceptionreturn;
}
+
}
Index: src/java/org/apache/nutch/crawl/LinkDb.java
===================================================================-
- src/java/org/apache/nutch/crawl/LinkDb.java (revision 374779)
+++ src/java/org/apache/nutch/crawl/LinkDb.java (working copy)
@@ -118,7 +118,8 @@
output.collect(key, result);
}
- src/java/org/apache/nutch/crawl/LinkDb.java (revision 374779)
-
-
+ public void finished() {}
+
public void invert(File linkDb, File segmentsDir) throws IOException
}
+
+ public void finished() {}
}
/** Combine multiple new entries for a url. */
@@ -76,6 +78,7 @@
throws IOException
+ public void finished() {}
}
/** Construct an Injector. */
Index: src/java/org/apache/nutch/crawl/Generator.java
===================================================================
— src/java/org/apache/nutch/crawl/Generator.java (revision 374779)
+++ src/java/org/apache/nutch/crawl/Generator.java (working copy)
@@ -63,6 +63,8 @@
output.collect(crawlDatum, key); // invert for sort by score
}
+ public void finished() {}
+
/** Partition by host (value). */
public int getPartition(WritableComparable key, Writable value,
int numReduceTasks)
}
+ public void finished() {}
}
Index: src/java/org/apache/nutch/parse/ParseSegment.java
===================================================================
— src/java/org/apache/nutch/parse/ParseSegment.java (revision 374776)
+++ src/java/org/apache/nutch/parse/ParseSegment.java (working copy)
@@ -78,6 +78,8 @@
throws IOException
+
+ public void finished() {}
public void parse(File segment) throws IOException {
LOG.info("Parse: starting");