Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
Description
Current tests: LookupJoinTest#testJoinTemporalTableWithProjectionPushDown
@Test def testJoinTemporalTableWithProjectionPushDown(): Unit = { val sql = """ |SELECT T.*, D.id |FROM MyTable AS T |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D |ON T.a = D.id """.stripMargin util.verifyExecPlan(sql) }
the optimized plan doesn't print the selected columns from lookup source, but actually it didn't push the project into lookup source (still select all columns from source), this is not as expected
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id])
+- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
incorrect intermediate optimization result
========= logical_rewrite ======== optimize result: FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]) :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +- FlinkLogicalSnapshot(period=[$cor0.proctime]) +- FlinkLogicalCalc(select=[id]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age]) ========= time_indicator ======== optimize result: FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id]) +- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner]) :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +- FlinkLogicalSnapshot(period=[$cor0.proctime]) +- FlinkLogicalCalc(select=[id]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
plan comparison after fix
Attachments
Attachments
Issue Links
- relates to
-
FLINK-20840 Projection pushdown doesn't work in temporal(lookup) join
- Closed
- links to