Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.11.3, 1.12.2
Description
WatermarkAssigner incorrectly recomputing the rowtime index in copy method which may cause ArrayIndexOutOfBoundsException in such case:
@Test def testProjectTransposeWatermarkAssigner(): Unit = { val sourceDDL = s""" |CREATE TEMPORARY TABLE `t1` ( | `a` VARCHAR, | `b` VARCHAR, | `c` VARCHAR, | `d` INT, | `t` TIMESTAMP(3), | `ts` AS `t`, | WATERMARK FOR `ts` AS `ts` - INTERVAL '10' SECOND |) WITH ( | 'connector' = 'values', | 'enable-watermark-push-down' = 'true', | 'bounded' = 'false', | 'disable-lookup' = 'true' |) """.stripMargin util.tableEnv.executeSql(sourceDDL) val sql = s""" |select a, b, ts |from t1 |""".stripMargin util.verifyPlan(sql) }
exception stack
java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 3 at com.google.common.collect.RegularImmutableList.get(RegularImmutableList.java:75) at org.apache.calcite.util.Util$TransformingList.get(Util.java:2732) at org.apache.flink.table.planner.plan.nodes.calcite.WatermarkAssigner.copy(WatermarkAssigner.scala:68) at org.apache.calcite.plan.hep.HepPlanner.addRelToGraph(HepPlanner.java:805) at org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:158) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:282)
Attachments
Issue Links
- links to