Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-5098

Combine.Globally::asSingletonView clears side inputs

Details

    Description

      It seems like calling .asSingletonView on Combine.Globally clears all side inputs. Take this code for example:

       

      public class Main {
          public static void main(String[] args) {
              PipelineOptions options = PipelineOptionsFactory.create();
              Pipeline p = Pipeline.create(options);
      
              PCollection<Integer> a = p.apply(Create.of(1, 2, 3));
              PCollectionView<Integer> b = p.apply(Create.of(10)).apply(View.asSingleton());
      
              a
                      .apply(Combine.globally(new CombineWithContext.CombineFnWithContext<Integer, Integer, Integer>() {
                          @Override
                          public Integer createAccumulator(CombineWithContext.Context c) {
                              return c.sideInput(b);
                          }
      
                          @Override
                          public Integer addInput(Integer accumulator, Integer input, CombineWithContext.Context c) {
                              return accumulator + input;
                          }
      
                          @Override
                          public Integer mergeAccumulators(Iterable<Integer> accumulators, CombineWithContext.Context c) {
                              int sum = 0;
                              for (int i : accumulators) {
                                  sum += i;
                              }
                              return sum;
                          }
      
                          @Override
                          public Integer extractOutput(Integer accumulator, CombineWithContext.Context c) {
                              return accumulator;
                          }
      
                          @Override
                          public Integer defaultValue() {
                              return 0;
                          }
                      }).withSideInputs(b).asSingletonView());
      
              p.run().waitUntilFinish();
          }
      }

      This fails with the following exception:

      Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: calling sideInput() with unknown view
          at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)
          at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)
          at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)
          at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
          at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
          at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
          at Main.main(Main.java:287)
      Caused by: java.lang.IllegalArgumentException: calling sideInput() with unknown view
          at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:212)
          at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:69)
          at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:489)
          at org.apache.beam.sdk.transforms.Combine$GroupedValues$1$1.sideInput(Combine.java:2137)
          at Main$1.createAccumulator(Main.java:258)
          at Main$1.createAccumulator(Main.java:255)
          at org.apache.beam.sdk.transforms.CombineWithContext$CombineFnWithContext.apply(CombineWithContext.java:120)
          at org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2129)

      But if you change

      .withSideInputs(b).asSingletonView())

      to

      .withSideInputs(b)).apply(View.asSingleton())

      then it works just fine.

       

      Attachments

        Activity

          People

            mpedersen Mike Pedersen
            mpedersen Mike Pedersen
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 1.5h
                1.5h