Uploaded image for project: 'Hadoop Common'
  1. Hadoop Common
  2. HADOOP-811

Patch to support multi-threaded MapRunnable

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.10.0
    • 0.10.0
    • None
    • None
    • all

    Description

      The MapRunner calls Mapper.map in a serialized fashion.

      This is suitable for CPU/memory bound operations.

      However, when doing IO bound operations this serialization affects the throughput significantly.

      In order to support IO bound operations more efficiently I've implemented a multithreaded MapRunnable.

      Following is the implementation of this MapRunnable, MultithreadedMapRunner.

      I've only had to modify on method in the existing code (in the Task class) to avoid data corruption in the reporter.

      Index: Task.java
      ===================================================================
      — Task.java (revision 485492)
      +++ Task.java (working copy)
      @@ -153,9 +153,11 @@
      public Reporter getReporter(final TaskUmbilicalProtocol umbilical,
      final Progress progress) throws IOException {
      return new Reporter() {

      • public void setStatus(String status) throws IOException {
      • progress.setStatus(status);
      • progress();
        + public void setStatus(String status) throws IOException
        Unknown macro: {+ synchronized (this) { + progress.setStatus(status); + progress(); + } }

        public void progress() throws IOException {
        reportProgress(umbilical);

      -----------------------------------------------------------
      MultithreadedMapRunner.java

      package org.apache.hadoop.mapred;

      import org.apache.hadoop.util.ReflectionUtils;
      import org.apache.hadoop.io.WritableComparable;
      import org.apache.hadoop.io.Writable;
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;

      import java.io.IOException;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      import java.util.concurrent.TimeUnit;

      /**

      • Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
      • <p>
      • It can be used instead of the default implementation,
      • @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
      • bound in order to improve throughput.
      • <p>
      • Map implementations using this MapRunnable must be thread-safe.
      • <p>
      • The Map-Reduce job has to be configured to use this MapRunnable class (using
      • the <b>mapred.map.runner.class</b> property) and
      • the number of thread the thread-pool can use (using the
      • <b>mapred.map.multithreadedrunner.threads</b> property).
      • <p>
        *
      • @author Alejandro Abdelnur
        */
        public class MultithreadedMapRunner implements MapRunnable {
        private static final Log LOG =
        LogFactory.getLog(MultithreadedMapRunner.class.getName());

      private JobConf job;
      private Mapper mapper;
      private ExecutorService executorService;
      private IOException ioException;

      public void configure(JobConf job) {
      int numberOfThreads =
      job.getInt("mapred.map.multithreadedrunner.threads", 10);
      if (LOG.isDebugEnabled())

      { LOG.debug("Configuring job " + job.getJobName() + " to use " + numberOfThreads + " threads" ); }

      this.job = job;
      this.mapper = (Mapper)ReflectionUtils.newInstance(job.getMapperClass(),
      job);

      // Creating a threadpool of the configured size to execute the Mapper
      // map method in parallel.
      executorService = Executors.newFixedThreadPool(numberOfThreads);
      }

      public void run(RecordReader input, OutputCollector output,
      Reporter reporter)
      throws IOException {
      try {
      // allocate key & value instances these objects will not be reused
      // because execution of Mapper.map is not serialized.
      WritableComparable key = input.createKey();
      Writable value = input.createValue();

      while (input.next(key, value)) {

      // Run Mapper.map execution asynchronously in a separate thread.
      // If threads are not available from the thread-pool this method
      // will block until there is a thread available.
      executorService.execute(
      new MTMapperRunable(key, value, output, reporter));

      // Checking if a Mapper.map within a Runnable has generated an
      // IOException. If so we rethrow it to force an abort of the Map
      // operation thus keeping the semantics of the default
      // implementation.
      if (ioException != null)

      { throw ioException; }

      // Allocate new key & value instances as mapper is running in parallel
      key = input.createKey();
      value = input.createValue();
      }

      if (LOG.isDebugEnabled()) { LOG.debug("Finished dispatching all Mappper.map calls, job " + job.getJobName()); }

      // Graceful shutdown of the Threadpool, it will let all scheduled
      // Runnables to end.
      executorService.shutdown();

      try {

      // Now waiting for all Runnables to end.
      while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
      if (LOG.isDebugEnabled()) { LOG.debug("Awaiting all running Mappper.map calls to finish, job " + job.getJobName()); }

      // Checking if a Mapper.map within a Runnable has generated an
      // IOException. If so we rethrow it to force an abort of the Map
      // operation thus keeping the semantics of the default
      // implementation.
      // NOTE: while Mapper.map dispatching has concluded there are still
      // map calls in progress.
      if (ioException != null) { throw ioException; }
      }

      // Checking if a Mapper.map within a Runnable has generated an
      // IOException. If so we rethrow it to force an abort of the Map
      // operation thus keeping the semantics of the default
      // implementation.
      // NOTE: it could be that a map call has had an exception after the
      // call for awaitTermination() returing true. And edge case but it
      // could happen.
      if (ioException != null) { throw ioException; }

      }
      catch (IOException ioEx)

      { // Forcing a shutdown of all thread of the threadpool and rethrowing // the IOException executorService.shutdownNow(); throw ioEx; }

      catch (InterruptedException iEx)

      { throw new IOException(iEx.getMessage()); }

      } finally

      { mapper.close(); }

      }

      /**

      • Runnable to execute a single Mapper.map call from a forked thread.
        */
        private class MTMapperRunable implements Runnable {
        private WritableComparable key;
        private Writable value;
        private OutputCollector output;
        private Reporter reporter;

      /**

      • Collecting all required parameters to execute a Mapper.map call.
      • <p>
        *
      • @param key
      • @param value
      • @param output
      • @param reporter
        */
        public MTMapperRunable(WritableComparable key, Writable value,
        OutputCollector output, Reporter reporter) { this.key = key; this.value = value; this.output = output; this.reporter = reporter; }

      /**

      • Executes a Mapper.map call with the given Mapper and parameters.
      • <p>
      • This method is called from the thread-pool thread.
        *
        */
        public void run() {
        try { // map pair to output MultithreadedMapRunner.this.mapper.map(key, value, output, reporter); }

        catch (IOException ex) {
        // If there is an IOException during the call it is set in an instance
        // variable of the MultithreadedMapRunner from where it will be
        // rethrown.
        synchronized (MultithreadedMapRunner.this)

        Unknown macro: { if (MultithreadedMapRunner.this.ioException == null) { MultithreadedMapRunner.this.ioException = ex; } }

        }
        }
        }

      }

      Attachments

        1. MultithreadedMapRunner.java
          6 kB
          Alejandro Abdelnur
        2. MultithreadedMapRunner.java
          7 kB
          Alejandro Abdelnur
        3. diff.txt
          7 kB
          Alejandro Abdelnur
        4. diff.txt
          8 kB
          Alejandro Abdelnur

        Activity

          People

            cutting Doug Cutting
            tucu00 Alejandro Abdelnur
            Votes:
            1 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: