Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-6929

Session Windows with lateness cause NullPointerException in Flink Runner

Details

    • Bug
    • Status: Resolved
    • P1
    • Resolution: Fixed
    • None
    • 2.12.0
    • runner-flink
    • None

    Description

      Reported on the mailing list:

      I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink Cluster - 1.7.2.
      
      I have this flow in my pipeline:
      KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes, default trigger)  -->  BeamSQL(GroupBy query)  -->  Window.remerge()  -->  Enrichment  -->  KafkaSink
      
      I am generating data in such a way that the first two records belong to two different sessions. And, generating the third record before the first session expires with the timestamp for the third record in such a way that the two sessions will be merged to become a single session.
      
      For Example, These are the sample input and output obtained when I ran the same pipeline in DirectRunner.
      
      Sample Input:
      {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
      {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
      {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}
      
      Sample Output:
      {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27 15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
      {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27 15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
      {"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27 15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}
      
      Where "NumberOfRecords" is the count, "WST" is the Avro field Name which indicates the window start time for the session window. Similarly "WET" indicates the window End time of the session window. I am getting "WST" and "WET" after remerging and applying ParDo(Enrichment stage of the pipeline).
      
      The program ran successfully in DirectRunner. But, in FlinkRunner, I am getting this exception when the third record arrives:
      
      2019-03-27 15:31:00,442 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow) -> (Window.Into()/Window.Assign.out -> DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous) -> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem, DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter) -> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default key/Map/ParMultiDo(Anonymous) -> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka ProducerRecord/Map/ParMultiDo(Anonymous) -> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)) (1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to RUNNING.
      2019-03-27 15:33:25,427 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey -> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous) -> DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous) -> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc) -> DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) -> DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo) -> DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter) -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default key/Map/ParMultiDo(Anonymous) -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka ProducerRecord/Map/ParMultiDo(Anonymous) -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter) (1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to FAILED.
      org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
              at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
              at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
              at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
              at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
              at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
              at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
              at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457)
              at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
              at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
              at java.lang.Thread.run(Thread.java:748)
      Caused by: java.lang.NullPointerException
              at org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703)
              at org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674)
              at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517)
              at org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192)
              at org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162)
              at org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132)
              at org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507)
              at org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
              at org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
              at org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
              at org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
              at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
      
      Is this a known issue with FlinkRunner? Is Session Windows with lateness @experimental in FlinkRunner?
      
      I have also tried with Runner - beam-runners-flink_2.11, Flink Cluster - 1.5.3 and came across the same exception.
      
      I have also tried generating data with lateness as 0, and everything is working as expected. Seems like there is no problem in merging the windows of the records which belong to the same session.
      
      

      Attachments

        Issue Links

          Activity

            People

              mxm Maximilian Michels
              mxm Maximilian Michels
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h
                  2h