Details
-
Bug
-
Status: Resolved
-
P1
-
Resolution: Fixed
-
2.5.0
Description
It seems like calling .asSingletonView on Combine.Globally clears all side inputs. Take this code for example:
public class Main { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); PCollection<Integer> a = p.apply(Create.of(1, 2, 3)); PCollectionView<Integer> b = p.apply(Create.of(10)).apply(View.asSingleton()); a .apply(Combine.globally(new CombineWithContext.CombineFnWithContext<Integer, Integer, Integer>() { @Override public Integer createAccumulator(CombineWithContext.Context c) { return c.sideInput(b); } @Override public Integer addInput(Integer accumulator, Integer input, CombineWithContext.Context c) { return accumulator + input; } @Override public Integer mergeAccumulators(Iterable<Integer> accumulators, CombineWithContext.Context c) { int sum = 0; for (int i : accumulators) { sum += i; } return sum; } @Override public Integer extractOutput(Integer accumulator, CombineWithContext.Context c) { return accumulator; } @Override public Integer defaultValue() { return 0; } }).withSideInputs(b).asSingletonView()); p.run().waitUntilFinish(); } }
This fails with the following exception:
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: calling sideInput() with unknown view
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at Main.main(Main.java:287)
Caused by: java.lang.IllegalArgumentException: calling sideInput() with unknown view
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.sideInput(SimpleDoFnRunner.java:212)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:69)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:489)
at org.apache.beam.sdk.transforms.Combine$GroupedValues$1$1.sideInput(Combine.java:2137)
at Main$1.createAccumulator(Main.java:258)
at Main$1.createAccumulator(Main.java:255)
at org.apache.beam.sdk.transforms.CombineWithContext$CombineFnWithContext.apply(CombineWithContext.java:120)
at org.apache.beam.sdk.transforms.Combine$GroupedValues$1.processElement(Combine.java:2129)
But if you change
.withSideInputs(b).asSingletonView())
to
.withSideInputs(b)).apply(View.asSingleton())
then it works just fine.