Description
I'm doing some work about running TPC-H queries on Flink, and encounter a problem that q20 can't be de-correlated appropriately when ConfigBuilder.withExpand(false).
TPC-H q20 query has a WHERE clause composed of an IN predicate with an un-correlated subquery and an EXISTS predicate with a correlated subquery.
Therefore, in SubQueryRemoveRule.FILTER, the "variablesSet" from the entire Filter rel, referenced by the correlated subquery of EXISTS predicate actually, applies to the un-correlated subquery of IN predicate, and finally generates an inner-join LogicalCorrelate which will be ignored in RelDecorrelator.
- The q20 query:
-- tpch20 select s.s_name, s.s_address from supplier s, nation n where s.s_suppkey in ( select ps.ps_suppkey from partsupp ps where ps. ps_partkey in ( select p.p_partkey from part p where p.p_name like 'antique%' ) and ps.ps_availqty > ( select 0.5 * sum(l.l_quantity) from lineitem l where l.l_partkey = ps.ps_partkey and l.l_suppkey = ps.ps_suppkey and l.l_shipdate >= date '1993-01-01' and l.l_shipdate < date '1993-01-01' + interval '1' year ) ) and s.s_nationkey = n.n_nationkey and n.n_name = 'KENYA' order by s.s_name
- Plan before SubQueryRemoveRule.FILTER:
LogicalSort(sort0=[$0], dir0=[ASC]) LogicalProject(s_name=[$1], s_address=[$2]) LogicalFilter(condition=[AND(IN($0, { LogicalProject(ps_suppkey=[$1]) LogicalFilter(condition=[AND(IN($0, { LogicalProject(p_partkey=[$0]) LogicalFilter(condition=[LIKE($1, 'antique%')]) LogicalTableScan(table=[[part, source: [selectedFields=[p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment]]]]) }), >($2, $SCALAR_QUERY({ LogicalProject(EXPR$0=[*(0.5, $0)]) LogicalAggregate(group=[{}], agg#0=[SUM($0)]) LogicalProject(l_quantity=[$4]) LogicalFilter(condition=[AND(=($1, $cor0.ps_partkey), =($2, $cor0.ps_suppkey), >=($10, 1993-01-01), <($10, DATETIME_PLUS(1993-01-01, 12)))]) LogicalTableScan(table=[[lineitem, source: [selectedFields=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]]]]) })))], variablesSet=[[$cor0]]) LogicalTableScan(table=[[partsupp, source: [selectedFields=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment]]]]) }), =($3, $7), =($8, 'KENYA'))]) LogicalJoin(condition=[true], joinType=[inner]) LogicalTableScan(table=[[supplier, source: [selectedFields=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]]]]) LogicalTableScan(table=[[nation, source: [selectedFields=[n_nationkey, n_name, n_regionkey, n_comment]]]])
- Plan after SubQueryRemoveRule.FILTER (also differs from the plan when ConfigBuilder.withExpand(true)):
LogicalSort(sort0=[$0], dir0=[ASC]) LogicalProject(s_name=[$1], s_address=[$2]) LogicalProject(s_suppkey=[$0], s_name=[$1], s_address=[$2], s_nationkey=[$3], s_phone=[$4], s_acctbal=[$5], s_comment=[$6], n_nationkey=[$7], n_name=[$8], n_regionkey=[$9], n_comment=[$10]) LogicalFilter(condition=[AND(=($3, $7), =($8, 'KENYA'))]) LogicalJoin(condition=[=($0, $11)], joinType=[inner]) LogicalJoin(condition=[true], joinType=[inner]) LogicalTableScan(table=[[supplier, source: [selectedFields=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]]]]) LogicalTableScan(table=[[nation, source: [selectedFields=[n_nationkey, n_name, n_regionkey, n_comment]]]]) LogicalAggregate(group=[{0}]) LogicalProject(ps_suppkey=[$1]) LogicalProject(ps_partkey=[$0], ps_suppkey=[$1], ps_availqty=[$2], ps_supplycost=[$3], ps_comment=[$4]) LogicalFilter(condition=[>($2, $6)]) LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1}]) LogicalFilter(condition=[=($0, $5)]) LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}]) <<<--- the problem is here! LogicalTableScan(table=[[partsupp, source: [selectedFields=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment]]]]) LogicalAggregate(group=[{0}]) LogicalProject(p_partkey=[$0]) LogicalFilter(condition=[LIKE($1, 'antique%')]) LogicalTableScan(table=[[part, source: [selectedFields=[p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment]]]]) LogicalAggregate(group=[{}], agg#0=[SINGLE_VALUE($0)]) LogicalProject(EXPR$0=[*(0.5, $0)]) LogicalAggregate(group=[{}], agg#0=[SUM($0)]) LogicalProject(l_quantity=[$4]) LogicalFilter(condition=[AND(=($1, $cor0.ps_partkey), =($2, $cor0.ps_suppkey), >=($10, 1993-01-01), <($10, DATETIME_PLUS(1993-01-01, 12)))]) LogicalTableScan(table=[[lineitem, source: [selectedFields=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]]]])