Details
-
Bug
-
Status: Resolved
-
P0
-
Resolution: Fixed
-
None
-
None
Description
private static class FnWithSideInputs extends DoFn<String, String> { private final PCollectionView<Integer> view; private FnWithSideInputs(PCollectionView<Integer> view) { this.view = view; } @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + ":" + c.sideInput(view)); } } @Test public void testSideInputsWithMultipleWindows() { Pipeline p = TestPipeline.create(); MutableDateTime mutableNow = Instant.now().toMutableDateTime(); mutableNow.setMillisOfSecond(0); Instant now = mutableNow.toInstant(); SlidingWindows windowFn = SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)); PCollectionView<Integer> view = p.apply(Create.of(1)).apply(View.<Integer>asSingleton()); PCollection<String> res = p.apply(Create.timestamped(TimestampedValue.of("a", now))) .apply(Window.<String>into(windowFn)) .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view)); PAssert.that(res).containsInAnyOrder("a:1"); p.run(); }
This fails with the following exception:
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalStateException: sideInput called when main input element is in multiple windows at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:343) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:1) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:176) at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:112) at .... Caused by: java.lang.IllegalStateException: sideInput called when main input element is in multiple windows at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.sideInput(SimpleDoFnRunner.java:514) at org.apache.beam.sdk.transforms.ParDoTest$FnWithSideInputs.processElement(ParDoTest.java:738)