Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21733

WatermarkAssigner incorrectly recomputing the rowtime index which may cause ArrayIndexOutOfBoundsException

    XMLWordPrintableJSON

Details

    Description

      WatermarkAssigner incorrectly recomputing the rowtime index in copy method which may cause ArrayIndexOutOfBoundsException in such case:

      @Test
        def testProjectTransposeWatermarkAssigner(): Unit = {
          val sourceDDL =
            s"""
               |CREATE TEMPORARY TABLE `t1` (
               |  `a`  VARCHAR,
               |  `b`  VARCHAR,
               |  `c`  VARCHAR,
               |  `d`  INT,
               |  `t`  TIMESTAMP(3),
               |  `ts` AS `t`,
               |  WATERMARK FOR `ts` AS `ts`  - INTERVAL '10' SECOND
               |) WITH (
               |  'connector' = 'values',
               |  'enable-watermark-push-down' = 'true',
               |  'bounded' = 'false',
               |  'disable-lookup' = 'true'
               |)
             """.stripMargin
          util.tableEnv.executeSql(sourceDDL)
      
          val sql =
            s"""
               |select a, b, ts
               |from t1
               |""".stripMargin
          util.verifyPlan(sql)
        }
      

      exception stack

      java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 3
      
      	at com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:75)
      	at org.apache.calcite.util.Util$TransformingList.get(Util.java:2732)
      	at org.apache.flink.table.planner.plan.nodes.calcite.WatermarkAssigner.copy(WatermarkAssigner.scala:68)
      	at org.apache.calcite.plan.hep.HepPlanner.addRelToGraph(HepPlanner.java:805)
      	at org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:158)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
      	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
      	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
      	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
      	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
      	at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
      	at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
      	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
      	at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
      	at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
      	at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:282)
      
      

      Attachments

        Issue Links

          Activity

            People

              lincoln.86xy lincoln lee
              lincoln.86xy lincoln lee
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: