Uploaded image for project: 'Apache Gobblin'
  1. Apache Gobblin
  2. GOBBLIN-150

BaseDataPublisher marks WorkUnitState as COMMITTED even if the publish fails

    XMLWordPrintableJSON

Details

    Description

      *This issue can result in data loss*

      The BaseDataPublisher uploads the output files to the publish location and in the process marks the WorkUnitStates as COMMITTED. Currently the code uploads each output directory using `ParallelRunner.movePath()`. This fires off a future and returns. After returning, the WorkUnitState is marked as COMMITTED. This means that WorkUnitStates are marked as COMMITTED before the file is even uploaded. Additionally, because the entire folder is being uploaded as a set via the underlying Hadoop call `FileUtil.copy()`, there is no way of tracking which files were uploaded successfully and which were not.

      ```
      public class BaseDataPublisher extends SingleTaskDataPublisher {
      public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
      ...
      for (WorkUnitState workUnitState : states) {
      for (int branchId = 0; branchId < this.numBranches; branchId++)

      { publishMultiTaskData(workUnitState, branchId, writerOutputPathsMoved); }

      workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
      }
      }

      private void publishMultiTaskData(WorkUnitState state, int branchId, Set<Path> writerOutputPathsMoved) throws IOException

      { publishData(state, branchId, false, writerOutputPathsMoved); }

      protected void publishData(WorkUnitState state, int branchId, boolean publishSingleTaskData, Set<Path> writerOutputPathsMoved) throws IOException

      { ... parallelRunner.movePath(writerOutputDir, this.publisherFileSystemByBranches.get(branchId), publisherOutputDir, this.publisherFinalDirOwnerGroupsByBranches.get(branchId)); ... }

      }

      public class ParallelRunner implements Closeable {
      public void movePath(final Path src, final FileSystem dstFs, final Path dst, final Optional<String> group) {
      this.futures.add(this.executor.submit(new Callable<Void>() {
      @Override
      public Void call() throws Exception

      { ... HadoopUtils.movePath(fs, src, dstFs, dst); ... }

      }));
      }
      }
      ```

      @zliu41, @sahilTakiar, @liyinan926 Feedback?

      Github Url : https://github.com/linkedin/gobblin/issues/561
      Github Reporter : jbaranick
      Github Created At : 2015-12-22T19:26:25Z
      Github Updated At : 2017-01-12T04:33:52Z

      Comments


      zliu41 wrote on 2015-12-23T17:46:37Z : @kadaan yes this is definitely a bug. Thanks for pointing out. The fix doesn't seem trivial and I'll seek opinions from other contributors. If you have any suggestions please let us know.

      Github Url : https://github.com/linkedin/gobblin/issues/561#issuecomment-166954898


      jbaranick wrote on 2015-12-23T17:49:23Z : @zliu41 I have a fix that seems to work for my case, but I could really use some extra eyes on it. I can submit a PR today.

      Github Url : https://github.com/linkedin/gobblin/issues/561#issuecomment-166955731


      zliu41 wrote on 2015-12-23T18:00:43Z : Sounds good!

      Github Url : https://github.com/linkedin/gobblin/issues/561#issuecomment-166959399


      jbaranick wrote on 2015-12-23T19:19:17Z : @zliu41 Added initial attempt for a fix in #572

      Github Url : https://github.com/linkedin/gobblin/issues/561#issuecomment-166974416

      Attachments

        Activity

          People

            hutran Hung Tran
            jbaranick Joel Baranick
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: