Uploaded image for project: 'Ignite'
  1. Ignite
  2. IGNITE-13691

Calcite Integration. Support of EXCEPT operator

Details

    • Improvement
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • None
    • None
    • sql
    • None
    • Docs Required, Release Notes Required

    Description

      As of now new Calcite based SQL engine doesn't support opearator EXCEPT. We need to implement it.

      Attachments

        Issue Links

          Activity

            githubbot ASF GitHub Bot logged work - 15/Apr/21 15:09
            githubbot ASF GitHub Bot logged work - 19/Apr/21 16:34
            • Time Spent:
              10m
               
              korlov42 commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r615911217



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RowHandler.java
              ##########
              @@ -40,6 +40,9 @@
                   /** */
                   int columnCount(Row row);
               
              + /** */
              + Object[] getColumns(Row row);

              Review comment:
                     This interface already provides a sufficient set of methods to work with a row. So if you need a shortcut for some operation, it's better to implement it locally. But personally I prefer to create GroupKey via builder (the fact that GroupKey's constructor is public is accident IMO).

              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
              ##########
              @@ -0,0 +1,341 @@
              +/*
              + * Licensed to the Apache Software Foundation (ASF) under one or more
              + * contributor license agreements. See the NOTICE file distributed with
              + * this work for additional information regarding copyright ownership.
              + * The ASF licenses this file to You under the Apache License, Version 2.0
              + * (the "License"); you may not use this file except in compliance with
              + * the License. You may obtain a copy of the License at
              + *
              + * http://www.apache.org/licenses/LICENSE-2.0
              + *
              + * Unless required by applicable law or agreed to in writing, software
              + * distributed under the License is distributed on an "AS IS" BASIS,
              + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              + * See the License for the specific language governing permissions and
              + * limitations under the License.
              + */
              +
              +package org.apache.ignite.internal.processors.query.calcite.exec.rel;
              +
              +import java.util.ArrayList;
              +import java.util.Collections;
              +import java.util.HashMap;
              +import java.util.Iterator;
              +import java.util.List;
              +import java.util.Map;
              +import org.apache.calcite.rel.type.RelDataType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
              +import org.apache.ignite.internal.util.typedef.F;
              +
              +/**
              + * Execution node for MINUS (EXCEPT) operator.
              + */
              +public class MinusNode<Row> extends AbstractNode<Row> {

              Review comment:
                     I have doubts regarding current implementation of MinusNode. Let's consider two cases:
                 
                 1. Currently both EXCEPT, if I'm not mistaken, load all rows (all distinct rows for EXCEPT[all=false]) from every source. Let's assume that there are 3 sources, the first one is empty and the last two has a million rows each. We will handle 2kk rows, although in fact we could give an answer right away . So, probably, it would be better to switch to a tree structure for execution node and process only two sources at the time.
                 2. There are a few more optimizations for EXCEPT[all=false]: a streaming mode and correlations. And both of them are already implemented for joins. Perhaps, it's worth to consider simple rewriting rule EXCEPT[all=false] => AntiJoin. Honestly speaking, AntiJoin is not implemented for CorrelatedNestedLoop, but it seems not so difficult anyway. So WDYT?

              ##########
              File path: modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
              ##########
              @@ -20,13 +20,15 @@
               import org.apache.ignite.internal.processors.query.calcite.planner.AggregateDistinctPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
              +import org.apache.ignite.internal.processors.query.calcite.planner.ExceptPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.SortAggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.SortedIndexSpoolPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
              +import org.apache.ignite.internal.processors.query.calcite.planner.UnionPlannerTest;

              Review comment:
                     worth to add this to suite too

              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
              ##########
              @@ -0,0 +1,147 @@
              +/*
              + * Licensed to the Apache Software Foundation (ASF) under one or more
              + * contributor license agreements. See the NOTICE file distributed with
              + * this work for additional information regarding copyright ownership.
              + * The ASF licenses this file to You under the Apache License, Version 2.0
              + * (the "License"); you may not use this file except in compliance with
              + * the License. You may obtain a copy of the License at
              + *
              + * http://www.apache.org/licenses/LICENSE-2.0
              + *
              + * Unless required by applicable law or agreed to in writing, software
              + * distributed under the License is distributed on an "AS IS" BASIS,
              + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              + * See the License for the specific language governing permissions and
              + * limitations under the License.
              + */
              +
              +package org.apache.ignite.internal.processors.query.calcite.rel.set;
              +
              +import java.util.List;
              +import java.util.Set;
              +import java.util.stream.Collectors;
              +import com.google.common.collect.ImmutableList;
              +import org.apache.calcite.plan.RelOptCluster;
              +import org.apache.calcite.plan.RelTraitSet;
              +import org.apache.calcite.rel.RelInput;
              +import org.apache.calcite.rel.RelNode;
              +import org.apache.calcite.rel.core.CorrelationId;
              +import org.apache.calcite.rel.type.RelDataType;
              +import org.apache.calcite.rel.type.RelDataTypeFactory;
              +import org.apache.calcite.util.Pair;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
              +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
              +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
              +import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
              +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
              +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
              +import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
              +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
              +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
              +import org.apache.ignite.internal.processors.query.calcite.util.Commons;
              +
              +/**
              + * Physical node for MAP phase of MINUS (EXCEPT) operator.
              + */
              +public class IgniteMapMinus extends IgniteMinusBase {
              + /** */
              + public IgniteMapMinus(
              + RelOptCluster cluster,
              + RelTraitSet traitSet,
              + List<RelNode> inputs,
              + boolean all
              + ) {
              + super(cluster, traitSet, inputs, all);
              + }
              +
              + /** */
              + public IgniteMapMinus(RelInput input) {
              + super(input);
              + }
              +
              + /** {@inheritDoc} */
              + @Override public IgniteMapMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
              + return new IgniteMapMinus(getCluster(), traitSet, inputs, all);
              + }
              +
              + /** {@inheritDoc} */
              + @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
              + return new IgniteMapMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
              + }
              +
              + /** {@inheritDoc} */
              + @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
              + return visitor.visit(this);
              + }
              +
              + /** {@inheritDoc} */
              + @Override protected RelDataType deriveRowType() {
              + RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
              +
              + assert typeFactory instanceof IgniteTypeFactory;
              +
              + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
              +
              + builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
              + builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
              +
              + return builder.build();
              + }
              +
              + /** {@inheritDoc} */
              + @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
              + RelTraitSet nodeTraits,
              + List<RelTraitSet> inputTraits
              + ) {
              + boolean rewindable = inputTraits.stream()
              + .map(TraitUtils::rewindability)
              + .allMatch(RewindabilityTrait::rewindable);
              +
              + if (rewindable)
              + return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
              +
              + return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
              + Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
              + }
              +
              + /** {@inheritDoc} */
              + @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
              + RelTraitSet nodeTraits,
              + List<RelTraitSet> inputTraits
              + ) {
              + Set<IgniteDistribution> distributions = inputTraits.stream()
              + .map(TraitUtils::distribution)
              + .collect(Collectors.toSet());
              +
              + ImmutableList.Builder<Pair<RelTraitSet, List<RelTraitSet>>> b = ImmutableList.builder();
              +
              + for (IgniteDistribution distribution : distributions) {

              Review comment:
                     For map node it doesn't make any sense to provide particular distribution. Hence, following should work fine for us:
                 `Pair.of(nodeTraits.replace(RANDOM), Commons.transform(inputTraits, t -> t.replace(ANY)))`




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 19/Apr/21 18:12
            • Time Spent:
              10m
               
              alex-plekhanov commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616069445



              ##########
              File path: modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
              ##########
              @@ -20,13 +20,15 @@
               import org.apache.ignite.internal.processors.query.calcite.planner.AggregateDistinctPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
              +import org.apache.ignite.internal.processors.query.calcite.planner.ExceptPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.SortAggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.SortedIndexSpoolPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
              +import org.apache.ignite.internal.processors.query.calcite.planner.UnionPlannerTest;

              Review comment:
                     I was thinking about it and add the test to the suite, but found that there no actually any checks in this test, only plan output.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 19/Apr/21 18:41
            • Time Spent:
              10m
               
              alex-plekhanov commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616087744



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
              ##########
              @@ -0,0 +1,341 @@
              +/*
              + * Licensed to the Apache Software Foundation (ASF) under one or more
              + * contributor license agreements. See the NOTICE file distributed with
              + * this work for additional information regarding copyright ownership.
              + * The ASF licenses this file to You under the Apache License, Version 2.0
              + * (the "License"); you may not use this file except in compliance with
              + * the License. You may obtain a copy of the License at
              + *
              + * http://www.apache.org/licenses/LICENSE-2.0
              + *
              + * Unless required by applicable law or agreed to in writing, software
              + * distributed under the License is distributed on an "AS IS" BASIS,
              + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              + * See the License for the specific language governing permissions and
              + * limitations under the License.
              + */
              +
              +package org.apache.ignite.internal.processors.query.calcite.exec.rel;
              +
              +import java.util.ArrayList;
              +import java.util.Collections;
              +import java.util.HashMap;
              +import java.util.Iterator;
              +import java.util.List;
              +import java.util.Map;
              +import org.apache.calcite.rel.type.RelDataType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
              +import org.apache.ignite.internal.util.typedef.F;
              +
              +/**
              + * Execution node for MINUS (EXCEPT) operator.
              + */
              +public class MinusNode<Row> extends AbstractNode<Row> {

              Review comment:
                     1. Optimization for this corner case is only possible for single or reduce node, since if there are no rows for the first input on some node, there can be suitable rows for other inputs on other nodes. So, map node still needs to process all input rows in any case. I will implement this optimization, but I think it's not very often case.
                 2. Did you mean `[all=true]` (not DISTINCT)? For `[all=false]` (DISTINCT) we still need implementation like current. This implementation work for both cases (also for both INTERSECT cases if we overload `availableRows` method). Also, for `[all=true]` in some cases current implementation will perform better than AntiJoin. So, if we really need an optimization with AntiJoin we can implement it later by another ticket, WDYT?
                 
                 BTW, Calcite uses about the same approach as the current implementation (Sets and MultiSets) for MINUS and INTERSECT.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 19/Apr/21 20:27
            • Time Spent:
              10m
               
              korlov42 commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616154789



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
              ##########
              @@ -0,0 +1,341 @@
              +/*
              + * Licensed to the Apache Software Foundation (ASF) under one or more
              + * contributor license agreements. See the NOTICE file distributed with
              + * this work for additional information regarding copyright ownership.
              + * The ASF licenses this file to You under the Apache License, Version 2.0
              + * (the "License"); you may not use this file except in compliance with
              + * the License. You may obtain a copy of the License at
              + *
              + * http://www.apache.org/licenses/LICENSE-2.0
              + *
              + * Unless required by applicable law or agreed to in writing, software
              + * distributed under the License is distributed on an "AS IS" BASIS,
              + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              + * See the License for the specific language governing permissions and
              + * limitations under the License.
              + */
              +
              +package org.apache.ignite.internal.processors.query.calcite.exec.rel;
              +
              +import java.util.ArrayList;
              +import java.util.Collections;
              +import java.util.HashMap;
              +import java.util.Iterator;
              +import java.util.List;
              +import java.util.Map;
              +import org.apache.calcite.rel.type.RelDataType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
              +import org.apache.ignite.internal.util.typedef.F;
              +
              +/**
              + * Execution node for MINUS (EXCEPT) operator.
              + */
              +public class MinusNode<Row> extends AbstractNode<Row> {

              Review comment:
                     I may not really understand how the EXCEPT is supposed to work... So let's find out it first to sure we are on the same page. My understanding is matches the [Postgres definition](https://www.postgresql.org/docs/7.4/sql-select.html) (sorry, but there is no anchor for exact place).
                 
                 Current implementation of the execution node says that only leftmost query defines a top bound of row count, every next stage of EXCEPT could only shorten the resultset. Hence a conclusion: we could drop following stages as soon as current stage returns empty resultset. It's true that we have to complete gathering rows for current stage, but I see no reason to proceed the work a result of which we are no more interested in.
                 
                 
                 > Did you mean [all=true] (not DISTINCT)?
                 
                 No, I meant exactly what I said. Frankly I have no clue how a query with EXCEPT ALL could be easily rewritten with ANTI JOIN since it requires a counting rows on both sides. On the other hand a query like `SELECT f1 FROM t1 EXCEPT SELECT f1 FROM t2` could be simply rewritten to `SELECT DISTINCT f1 FROM t1 ANTI JOIN t2 USING (f1)`.
                 
                 
                 > Calcite uses about the same approach
                 
                 For some (unclear to me) reasons Calcite doesn't like SEMI and ANTI joins. And we already faced a [problem](https://issues.apache.org/jira/browse/IGNITE-14277) because of this
                 




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 19/Apr/21 20:29
            • Time Spent:
              10m
               
              korlov42 commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616156277



              ##########
              File path: modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
              ##########
              @@ -20,13 +20,15 @@
               import org.apache.ignite.internal.processors.query.calcite.planner.AggregateDistinctPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
              +import org.apache.ignite.internal.processors.query.calcite.planner.ExceptPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.SortAggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.SortedIndexSpoolPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
              +import org.apache.ignite.internal.processors.query.calcite.planner.UnionPlannerTest;

              Review comment:
                     hw, looks like we need to revise these tests. Could you please file a ticket for this?




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 19/Apr/21 20:30
            • Time Spent:
              10m
               
              korlov42 commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616156277



              ##########
              File path: modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
              ##########
              @@ -20,13 +20,15 @@
               import org.apache.ignite.internal.processors.query.calcite.planner.AggregateDistinctPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
              +import org.apache.ignite.internal.processors.query.calcite.planner.ExceptPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.SortAggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.SortedIndexSpoolPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
              +import org.apache.ignite.internal.processors.query.calcite.planner.UnionPlannerTest;

              Review comment:
                     hm, looks like we need to revise these tests. Could you please file a ticket for this?




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 19/Apr/21 20:43
            • Time Spent:
              10m
               
              alex-plekhanov commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616164353



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
              ##########
              @@ -0,0 +1,147 @@
              +/*
              + * Licensed to the Apache Software Foundation (ASF) under one or more
              + * contributor license agreements. See the NOTICE file distributed with
              + * this work for additional information regarding copyright ownership.
              + * The ASF licenses this file to You under the Apache License, Version 2.0
              + * (the "License"); you may not use this file except in compliance with
              + * the License. You may obtain a copy of the License at
              + *
              + * http://www.apache.org/licenses/LICENSE-2.0
              + *
              + * Unless required by applicable law or agreed to in writing, software
              + * distributed under the License is distributed on an "AS IS" BASIS,
              + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              + * See the License for the specific language governing permissions and
              + * limitations under the License.
              + */
              +
              +package org.apache.ignite.internal.processors.query.calcite.rel.set;
              +
              +import java.util.List;
              +import java.util.Set;
              +import java.util.stream.Collectors;
              +import com.google.common.collect.ImmutableList;
              +import org.apache.calcite.plan.RelOptCluster;
              +import org.apache.calcite.plan.RelTraitSet;
              +import org.apache.calcite.rel.RelInput;
              +import org.apache.calcite.rel.RelNode;
              +import org.apache.calcite.rel.core.CorrelationId;
              +import org.apache.calcite.rel.type.RelDataType;
              +import org.apache.calcite.rel.type.RelDataTypeFactory;
              +import org.apache.calcite.util.Pair;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
              +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
              +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
              +import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
              +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
              +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
              +import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
              +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
              +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
              +import org.apache.ignite.internal.processors.query.calcite.util.Commons;
              +
              +/**
              + * Physical node for MAP phase of MINUS (EXCEPT) operator.
              + */
              +public class IgniteMapMinus extends IgniteMinusBase {
              + /** */
              + public IgniteMapMinus(
              + RelOptCluster cluster,
              + RelTraitSet traitSet,
              + List<RelNode> inputs,
              + boolean all
              + ) {
              + super(cluster, traitSet, inputs, all);
              + }
              +
              + /** */
              + public IgniteMapMinus(RelInput input) {
              + super(input);
              + }
              +
              + /** {@inheritDoc} */
              + @Override public IgniteMapMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
              + return new IgniteMapMinus(getCluster(), traitSet, inputs, all);
              + }
              +
              + /** {@inheritDoc} */
              + @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
              + return new IgniteMapMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
              + }
              +
              + /** {@inheritDoc} */
              + @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
              + return visitor.visit(this);
              + }
              +
              + /** {@inheritDoc} */
              + @Override protected RelDataType deriveRowType() {
              + RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
              +
              + assert typeFactory instanceof IgniteTypeFactory;
              +
              + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
              +
              + builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
              + builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
              +
              + return builder.build();
              + }
              +
              + /** {@inheritDoc} */
              + @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
              + RelTraitSet nodeTraits,
              + List<RelTraitSet> inputTraits
              + ) {
              + boolean rewindable = inputTraits.stream()
              + .map(TraitUtils::rewindability)
              + .allMatch(RewindabilityTrait::rewindable);
              +
              + if (rewindable)
              + return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
              +
              + return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
              + Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
              + }
              +
              + /** {@inheritDoc} */
              + @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
              + RelTraitSet nodeTraits,
              + List<RelTraitSet> inputTraits
              + ) {
              + Set<IgniteDistribution> distributions = inputTraits.stream()
              + .map(TraitUtils::distribution)
              + .collect(Collectors.toSet());
              +
              + ImmutableList.Builder<Pair<RelTraitSet, List<RelTraitSet>>> b = ImmutableList.builder();
              +
              + for (IgniteDistribution distribution : distributions) {

              Review comment:
                     In this case broadcast input can be planned on the MAP phase to processed together with random input on each node, which produces wrong results. `CalciteQueryProcessorTest#testExceptMerge` reproduces this issue.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 19/Apr/21 20:52
            • Time Spent:
              10m
               
              alex-plekhanov commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616169693



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RowHandler.java
              ##########
              @@ -40,6 +40,9 @@
                   /** */
                   int columnCount(Row row);
               
              + /** */
              + Object[] getColumns(Row row);

              Review comment:
                     It's not only a shortcut but there is also a lot of overhead when we copy each field of row one by one. And GroupKey class still needs to be changed (at least `fieldsCount` method required). I think the current change doesn't break any abstraction but produces less garbage and works faster, but I can rewrite it to use one-by-one fields copy methods if you are not agreed. WDYT?




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 19/Apr/21 20:54
            • Time Spent:
              10m
               
              alex-plekhanov commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616069445



              ##########
              File path: modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
              ##########
              @@ -20,13 +20,15 @@
               import org.apache.ignite.internal.processors.query.calcite.planner.AggregateDistinctPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
              +import org.apache.ignite.internal.processors.query.calcite.planner.ExceptPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.SortAggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.SortedIndexSpoolPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
              +import org.apache.ignite.internal.processors.query.calcite.planner.UnionPlannerTest;

              Review comment:
                     I was thinking about it and add the test to the suite, but found that there no actually any checks in this test, only plan output. I will add some trivial checks to UnionPlannerTest and will include it to suite.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 20/Apr/21 06:06
            • Time Spent:
              10m
               
              korlov42 commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616369327



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RowHandler.java
              ##########
              @@ -40,6 +40,9 @@
                   /** */
                   int columnCount(Row row);
               
              + /** */
              + Object[] getColumns(Row row);

              Review comment:
                     Please don't be fooled by the fact we have only one row representation -- array of object. You should treat a row as an abstract row. I don't like the idea to extend the interface that way because this change is too implementation specific (in terms of reducing garbage).
                 
                 > And GroupKey class still needs to be changed
                 
                 I'm OK with this. Moreover I would rather generify a GroupKey. It should be a wrapper for the Row, and there is nothing wrong to return underlying row. But perhaps it would be better to do under separate ticket.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 20/Apr/21 06:55
            • Time Spent:
              10m
               
              alex-plekhanov commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616393789



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RowHandler.java
              ##########
              @@ -40,6 +40,9 @@
                   /** */
                   int columnCount(Row row);
               
              + /** */
              + Object[] getColumns(Row row);

              Review comment:
                     > Please don't be fooled by the fact we have only one row representation -- array of object.
                 
                 There can be any row representation. As far as `RowHandler` can return `Object` by the index of column and count of columns it can return `Object[]` - the array of columns values as well. The `getColumns()` method of `RowHandler` doesn't mean "give me internals of row", it means "make an array from columns values", but in some cases (`ArrayRowHandler`) it can be optimized and return just internals.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 20/Apr/21 07:00
            • Time Spent:
              10m
               
              korlov42 commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616397266



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RowHandler.java
              ##########
              @@ -40,6 +40,9 @@
                   /** */
                   int columnCount(Row row);
               
              + /** */
              + Object[] getColumns(Row row);

              Review comment:
                     > As far as RowHandler can return Object by the index of column and count of columns it can return Object[]
                 
                 should we provided similar methods for `List<Object>` and other collections then? And what about `BinaryObject getColumns(Row row)`?




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 20/Apr/21 07:19
            • Time Spent:
              10m
               
              alex-plekhanov commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616411130



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
              ##########
              @@ -0,0 +1,341 @@
              +/*
              + * Licensed to the Apache Software Foundation (ASF) under one or more
              + * contributor license agreements. See the NOTICE file distributed with
              + * this work for additional information regarding copyright ownership.
              + * The ASF licenses this file to You under the Apache License, Version 2.0
              + * (the "License"); you may not use this file except in compliance with
              + * the License. You may obtain a copy of the License at
              + *
              + * http://www.apache.org/licenses/LICENSE-2.0
              + *
              + * Unless required by applicable law or agreed to in writing, software
              + * distributed under the License is distributed on an "AS IS" BASIS,
              + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              + * See the License for the specific language governing permissions and
              + * limitations under the License.
              + */
              +
              +package org.apache.ignite.internal.processors.query.calcite.exec.rel;
              +
              +import java.util.ArrayList;
              +import java.util.Collections;
              +import java.util.HashMap;
              +import java.util.Iterator;
              +import java.util.List;
              +import java.util.Map;
              +import org.apache.calcite.rel.type.RelDataType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
              +import org.apache.ignite.internal.util.typedef.F;
              +
              +/**
              + * Execution node for MINUS (EXCEPT) operator.
              + */
              +public class MinusNode<Row> extends AbstractNode<Row> {

              Review comment:
                     > My understanding is matches the Postgres definition
                 
                 Yes, the current implementation matches the Postgres definition too. I got the idea with this optimization, I will implement it. But as I said before, is not always possible, for example, we still need to process all sources on the MAP phase.
                 
                 > On the other hand a query like `SELECT f1 FROM t1 EXCEPT SELECT f1 FROM t2` could be simply rewritten to `SELECT DISTINCT f1 FROM t1 ANTI JOIN t2 USING (f1)`
                 
                 I'm not sure it's more optimal than the current approach. As far as I understand there are some limitations where this approach is applicable: tables `t1` and `t2` should be colocated, there should be an index on `t2` for `f1` columns, `t1` should be sorted by `f1` or streaming mode will be not possible with DISTINCT, etc.
                 I propose to start with the current implementation (as working for all cases and optimal for most cases) and make optimization with anti-join later if we really need it.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 20/Apr/21 07:24
            • Time Spent:
              10m
               
              alex-plekhanov commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616414661



              ##########
              File path: modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
              ##########
              @@ -20,13 +20,15 @@
               import org.apache.ignite.internal.processors.query.calcite.planner.AggregateDistinctPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.AggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.CorrelatedNestedLoopJoinPlannerTest;
              +import org.apache.ignite.internal.processors.query.calcite.planner.ExceptPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.HashAggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.HashIndexSpoolPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.JoinColocationPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.SortAggregatePlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.SortedIndexSpoolPlannerTest;
               import org.apache.ignite.internal.processors.query.calcite.planner.TableSpoolPlannerTest;
              +import org.apache.ignite.internal.processors.query.calcite.planner.UnionPlannerTest;

              Review comment:
                     https://issues.apache.org/jira/browse/IGNITE-14594




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 20/Apr/21 07:46
            • Time Spent:
              10m
               
              korlov42 commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616430744



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
              ##########
              @@ -0,0 +1,341 @@
              +/*
              + * Licensed to the Apache Software Foundation (ASF) under one or more
              + * contributor license agreements. See the NOTICE file distributed with
              + * this work for additional information regarding copyright ownership.
              + * The ASF licenses this file to You under the Apache License, Version 2.0
              + * (the "License"); you may not use this file except in compliance with
              + * the License. You may obtain a copy of the License at
              + *
              + * http://www.apache.org/licenses/LICENSE-2.0
              + *
              + * Unless required by applicable law or agreed to in writing, software
              + * distributed under the License is distributed on an "AS IS" BASIS,
              + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              + * See the License for the specific language governing permissions and
              + * limitations under the License.
              + */
              +
              +package org.apache.ignite.internal.processors.query.calcite.exec.rel;
              +
              +import java.util.ArrayList;
              +import java.util.Collections;
              +import java.util.HashMap;
              +import java.util.Iterator;
              +import java.util.List;
              +import java.util.Map;
              +import org.apache.calcite.rel.type.RelDataType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
              +import org.apache.ignite.internal.util.typedef.F;
              +
              +/**
              + * Execution node for MINUS (EXCEPT) operator.
              + */
              +public class MinusNode<Row> extends AbstractNode<Row> {

              Review comment:
                     > we still need to process all sources on the MAP phase
                 
                 `select_1 EXCEPT select_2 EXCEPT select_3 EXCEPT select_n` should be evaluated as `((select_1 EXCEPT select_2) EXCEPT select_3) EXCEPT select_n`. Could you please explain why we need to process rows from `select_n` if results of `(...) EXCEPT select_3` is empty? It's unclear to me right now.
                 
                 
                 > I propose to start with the current implementation
                 
                 works for me. It's better to benchmark both approaches and compare results to decide which one is better.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 20/Apr/21 07:47
            • Time Spent:
              10m
               
              korlov42 commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616430744



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
              ##########
              @@ -0,0 +1,341 @@
              +/*
              + * Licensed to the Apache Software Foundation (ASF) under one or more
              + * contributor license agreements. See the NOTICE file distributed with
              + * this work for additional information regarding copyright ownership.
              + * The ASF licenses this file to You under the Apache License, Version 2.0
              + * (the "License"); you may not use this file except in compliance with
              + * the License. You may obtain a copy of the License at
              + *
              + * http://www.apache.org/licenses/LICENSE-2.0
              + *
              + * Unless required by applicable law or agreed to in writing, software
              + * distributed under the License is distributed on an "AS IS" BASIS,
              + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              + * See the License for the specific language governing permissions and
              + * limitations under the License.
              + */
              +
              +package org.apache.ignite.internal.processors.query.calcite.exec.rel;
              +
              +import java.util.ArrayList;
              +import java.util.Collections;
              +import java.util.HashMap;
              +import java.util.Iterator;
              +import java.util.List;
              +import java.util.Map;
              +import org.apache.calcite.rel.type.RelDataType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
              +import org.apache.ignite.internal.util.typedef.F;
              +
              +/**
              + * Execution node for MINUS (EXCEPT) operator.
              + */
              +public class MinusNode<Row> extends AbstractNode<Row> {

              Review comment:
                     > we still need to process all sources on the MAP phase
                 
                 `select_1 EXCEPT select_2 EXCEPT select_3 EXCEPT select_n` should be evaluated as `((select_1 EXCEPT select_2) EXCEPT select_3) EXCEPT select_n`. Could you please explain why we need to process rows from `select_n` on MAP phase if results of `(...) EXCEPT select_3` is empty? It's unclear to me right now.
                 
                 
                 > I propose to start with the current implementation
                 
                 works for me. It's better to benchmark both approaches and compare results to decide which one is better.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 20/Apr/21 08:37
            • Time Spent:
              10m
               
              alex-plekhanov commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r616470424



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
              ##########
              @@ -0,0 +1,341 @@
              +/*
              + * Licensed to the Apache Software Foundation (ASF) under one or more
              + * contributor license agreements. See the NOTICE file distributed with
              + * this work for additional information regarding copyright ownership.
              + * The ASF licenses this file to You under the Apache License, Version 2.0
              + * (the "License"); you may not use this file except in compliance with
              + * the License. You may obtain a copy of the License at
              + *
              + * http://www.apache.org/licenses/LICENSE-2.0
              + *
              + * Unless required by applicable law or agreed to in writing, software
              + * distributed under the License is distributed on an "AS IS" BASIS,
              + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              + * See the License for the specific language governing permissions and
              + * limitations under the License.
              + */
              +
              +package org.apache.ignite.internal.processors.query.calcite.exec.rel;
              +
              +import java.util.ArrayList;
              +import java.util.Collections;
              +import java.util.HashMap;
              +import java.util.Iterator;
              +import java.util.List;
              +import java.util.Map;
              +import org.apache.calcite.rel.type.RelDataType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
              +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
              +import org.apache.ignite.internal.util.typedef.F;
              +
              +/**
              + * Execution node for MINUS (EXCEPT) operator.
              + */
              +public class MinusNode<Row> extends AbstractNode<Row> {

              Review comment:
                     > Could you please explain why we need to process rows from select_n on MAP phase if results of `(...) EXCEPT select_3` is empty?
                 
                 For example, we have 'a' EXCEPT 'a' EXCEPT 'b' on node 1, and 'b' EXCEPT 'c' EXCEPT 'c' on node 2. In this case after the first step, we already have an empty set on node 1. If we stop processing other inputs on node 1, on reduce phase we will have the resulting set as 'b'. But this 'b' (from node 2) should be eliminated by the result from node 1.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 20/Apr/21 14:14
            • Time Spent:
              10m
               
              alex-plekhanov commented on pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#issuecomment-823309326


                 @korlov42 I've fixed your comments, please have a look again.


              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 21/Apr/21 11:13
            • Time Spent:
              10m
               
              korlov42 commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r617440209



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
              ##########
              @@ -0,0 +1,147 @@
              +/*
              + * Licensed to the Apache Software Foundation (ASF) under one or more
              + * contributor license agreements. See the NOTICE file distributed with
              + * this work for additional information regarding copyright ownership.
              + * The ASF licenses this file to You under the Apache License, Version 2.0
              + * (the "License"); you may not use this file except in compliance with
              + * the License. You may obtain a copy of the License at
              + *
              + * http://www.apache.org/licenses/LICENSE-2.0
              + *
              + * Unless required by applicable law or agreed to in writing, software
              + * distributed under the License is distributed on an "AS IS" BASIS,
              + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              + * See the License for the specific language governing permissions and
              + * limitations under the License.
              + */
              +
              +package org.apache.ignite.internal.processors.query.calcite.rel.set;
              +
              +import java.util.List;
              +import java.util.Set;
              +import java.util.stream.Collectors;
              +import com.google.common.collect.ImmutableList;
              +import org.apache.calcite.plan.RelOptCluster;
              +import org.apache.calcite.plan.RelTraitSet;
              +import org.apache.calcite.rel.RelInput;
              +import org.apache.calcite.rel.RelNode;
              +import org.apache.calcite.rel.core.CorrelationId;
              +import org.apache.calcite.rel.type.RelDataType;
              +import org.apache.calcite.rel.type.RelDataTypeFactory;
              +import org.apache.calcite.util.Pair;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
              +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
              +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
              +import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
              +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
              +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
              +import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
              +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
              +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
              +import org.apache.ignite.internal.processors.query.calcite.util.Commons;
              +
              +/**
              + * Physical node for MAP phase of MINUS (EXCEPT) operator.
              + */
              +public class IgniteMapMinus extends IgniteMinusBase {
              + /** */
              + public IgniteMapMinus(
              + RelOptCluster cluster,
              + RelTraitSet traitSet,
              + List<RelNode> inputs,
              + boolean all
              + ) {
              + super(cluster, traitSet, inputs, all);
              + }
              +
              + /** */
              + public IgniteMapMinus(RelInput input) {
              + super(input);
              + }
              +
              + /** {@inheritDoc} */
              + @Override public IgniteMapMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
              + return new IgniteMapMinus(getCluster(), traitSet, inputs, all);
              + }
              +
              + /** {@inheritDoc} */
              + @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
              + return new IgniteMapMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
              + }
              +
              + /** {@inheritDoc} */
              + @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
              + return visitor.visit(this);
              + }
              +
              + /** {@inheritDoc} */
              + @Override protected RelDataType deriveRowType() {
              + RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
              +
              + assert typeFactory instanceof IgniteTypeFactory;
              +
              + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
              +
              + builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
              + builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
              +
              + return builder.build();
              + }
              +
              + /** {@inheritDoc} */
              + @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
              + RelTraitSet nodeTraits,
              + List<RelTraitSet> inputTraits
              + ) {
              + boolean rewindable = inputTraits.stream()
              + .map(TraitUtils::rewindability)
              + .allMatch(RewindabilityTrait::rewindable);
              +
              + if (rewindable)
              + return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
              +
              + return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
              + Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
              + }
              +
              + /** {@inheritDoc} */
              + @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
              + RelTraitSet nodeTraits,
              + List<RelTraitSet> inputTraits
              + ) {
              + Set<IgniteDistribution> distributions = inputTraits.stream()
              + .map(TraitUtils::distribution)
              + .collect(Collectors.toSet());
              +
              + ImmutableList.Builder<Pair<RelTraitSet, List<RelTraitSet>>> b = ImmutableList.builder();
              +
              + for (IgniteDistribution distribution : distributions) {

              Review comment:
                     Yes, you're right. The rule should be a little more complex:
                 ```
                         return ImmutableList.of(
                             Pair.of(nodeTraits.replace(IgniteDistributions.random()), Commons.transform(inputTraits,
                                 t -> t.replace(t.getDistribution() == IgniteDistributions.single() ? IgniteDistributions.single() : IgniteDistributions.random())
                             ))
                         );
                 ```
                 Perhaps, it worth to add additional planner's test to verifying a plan with bunch of table of all spectrum of distribution (affinity, single, broadcast)
                 
                 The problem with current approach is that it will create unnecessary nodes. Assume we have N inputs with N different hash distributions. After deriving we will have N different variant for each distribution. This then creates N * (N - 1) converters (if possible) for every variant, whereas we could simple do all the job in single map-reduce phase.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 21/Apr/21 11:21
            • Time Spent:
              10m
               
              korlov42 commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r617445038



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
              ##########
              @@ -142,7 +142,7 @@
                                   && DistributionFunction.satisfy(function, other.function));
               
                       if (other.getType() == RANDOM_DISTRIBUTED)
              - return getType() == HASH_DISTRIBUTED;
              + return getType() == HASH_DISTRIBUTED || getType() == SINGLETON;

              Review comment:
                     I don't sure this is a legit change. If SINGLETON will satisfy a RANDOM distribution, then there will be no additional exchange, which leads to singleton be a part of fragment with RANDOM distribution, but this should not be possible, since this fragment is allowed to be executed on several nodes, whereas SINGLETON is expected to be executed on only one




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 21/Apr/21 11:37
            • Time Spent:
              10m
               
              korlov42 commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r617454579



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
              ##########
              @@ -112,6 +112,14 @@ public void end(int idx) throws Exception {
               
                       checkState();
               
              + if (type == AggregateType.SINGLE && grouping.isEmpty()) {
              + requested = 0;
              +
              + downstream().end();

              Review comment:
                     Subsequent ending of downstream is unwanted because you have to be sure a caller side is waiting for next row. Currently it will be violated in case underlying source is empty, and caller invokes `request(0)`. This case is syntetic right now, yet still possible.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 21/Apr/21 16:18
            • Time Spent:
              10m
               
              alex-plekhanov commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r617696591



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
              ##########
              @@ -142,7 +142,7 @@
                                   && DistributionFunction.satisfy(function, other.function));
               
                       if (other.getType() == RANDOM_DISTRIBUTED)
              - return getType() == HASH_DISTRIBUTED;
              + return getType() == HASH_DISTRIBUTED || getType() == SINGLETON;

              Review comment:
                     This change was made in the middle of the development, looks like it's not required anymore. I will revert it.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 21/Apr/21 18:32
            • Time Spent:
              10m
               
              alex-plekhanov commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r617786322



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
              ##########
              @@ -142,7 +142,7 @@
                                   && DistributionFunction.satisfy(function, other.function));
               
                       if (other.getType() == RANDOM_DISTRIBUTED)
              - return getType() == HASH_DISTRIBUTED;
              + return getType() == HASH_DISTRIBUTED || getType() == SINGLETON;

              Review comment:
                     I do not quite understand. If `single` means that the plan node is allowed to execute only on one cluster node, then, for the MAP phase any input with `single` distribution should be prohibited. Since, if all inputs are single - `IgniteSingleMinus` node should be used, and if there is a mix of single and random plan nodes - random plan nodes should be converted to single with `IgniteExchange`. So, `IgniteMapMinus.deriveDistribution` should convert all inputs to `random`, or skip this plan if there is at least one `single` node.
                 
                 I was thinking previously that `single` trait means that data for this plan node is located only on one cluster node (nevertheless plan node can be executed on different cluster nodes, the same for `broadcast`, it shows only data distribution, but not force plan node to be executed on every cluster node). In this case change with `single().satisfies(random())` looks fine. And this change makes possible some plans like:
                 ```
                 IgniteReduceMinus(all=[false])
                   IgniteExchange(distribution=[single])
                     IgniteMapMinus(all=[false])
                       IgniteTableScan(table=[[PUBLIC, SINGLE_TBL1]])
                       IgniteTableScan(table=[[PUBLIC, RANDOM_TBL1]])
                 ```
                 Should we prohibit such plans?




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 22/Apr/21 12:47
            • Time Spent:
              10m
               
              korlov42 commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r618368523



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
              ##########
              @@ -142,7 +142,7 @@
                                   && DistributionFunction.satisfy(function, other.function));
               
                       if (other.getType() == RANDOM_DISTRIBUTED)
              - return getType() == HASH_DISTRIBUTED;
              + return getType() == HASH_DISTRIBUTED || getType() == SINGLETON;

              Review comment:
                     > single trait means that data for this plan node is located only on one cluster node
                 
                 This is correct. And the plan above is correct too. But the problem caused by limitation of current splitter implementation. When an exchange connects two fragments with SINGLE and RANDOM distribution, and the data flows from the first to the second, the IgniteReceiver nodes becomes a source of the data with a SINGLE distribution, but presented on every node of the fragment with RANDOM distribution, thereby creates N copies of the original data.
                 The same problem with BROADCAST to RANDOM transition.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 22/Apr/21 12:49
            • Time Spent:
              10m
               
              korlov42 commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r618369945



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
              ##########
              @@ -142,7 +142,7 @@
                                   && DistributionFunction.satisfy(function, other.function));
               
                       if (other.getType() == RANDOM_DISTRIBUTED)
              - return getType() == HASH_DISTRIBUTED;
              + return getType() == HASH_DISTRIBUTED || getType() == SINGLETON;

              Review comment:
                     I have an ideas hot cope with this problem, but for now we should not allows such plans.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 22/Apr/21 13:19
            • Time Spent:
              10m
               
              alex-plekhanov commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r618392554



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
              ##########
              @@ -142,7 +142,7 @@
                                   && DistributionFunction.satisfy(function, other.function));
               
                       if (other.getType() == RANDOM_DISTRIBUTED)
              - return getType() == HASH_DISTRIBUTED;
              + return getType() == HASH_DISTRIBUTED || getType() == SINGLETON;

              Review comment:
                     Ok, reverted this change for a while.

              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
              ##########
              @@ -112,6 +112,14 @@ public void end(int idx) throws Exception {
               
                       checkState();
               
              + if (type == AggregateType.SINGLE && grouping.isEmpty()) {
              + requested = 0;
              +
              + downstream().end();

              Review comment:
                     Fixed.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 22/Apr/21 13:19
            • Time Spent:
              10m
               
              alex-plekhanov commented on a change in pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009#discussion_r618393172



              ##########
              File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapMinus.java
              ##########
              @@ -0,0 +1,147 @@
              +/*
              + * Licensed to the Apache Software Foundation (ASF) under one or more
              + * contributor license agreements. See the NOTICE file distributed with
              + * this work for additional information regarding copyright ownership.
              + * The ASF licenses this file to You under the Apache License, Version 2.0
              + * (the "License"); you may not use this file except in compliance with
              + * the License. You may obtain a copy of the License at
              + *
              + * http://www.apache.org/licenses/LICENSE-2.0
              + *
              + * Unless required by applicable law or agreed to in writing, software
              + * distributed under the License is distributed on an "AS IS" BASIS,
              + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
              + * See the License for the specific language governing permissions and
              + * limitations under the License.
              + */
              +
              +package org.apache.ignite.internal.processors.query.calcite.rel.set;
              +
              +import java.util.List;
              +import java.util.Set;
              +import java.util.stream.Collectors;
              +import com.google.common.collect.ImmutableList;
              +import org.apache.calcite.plan.RelOptCluster;
              +import org.apache.calcite.plan.RelTraitSet;
              +import org.apache.calcite.rel.RelInput;
              +import org.apache.calcite.rel.RelNode;
              +import org.apache.calcite.rel.core.CorrelationId;
              +import org.apache.calcite.rel.type.RelDataType;
              +import org.apache.calcite.rel.type.RelDataTypeFactory;
              +import org.apache.calcite.util.Pair;
              +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
              +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
              +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
              +import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
              +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistribution;
              +import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
              +import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
              +import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
              +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
              +import org.apache.ignite.internal.processors.query.calcite.util.Commons;
              +
              +/**
              + * Physical node for MAP phase of MINUS (EXCEPT) operator.
              + */
              +public class IgniteMapMinus extends IgniteMinusBase {
              + /** */
              + public IgniteMapMinus(
              + RelOptCluster cluster,
              + RelTraitSet traitSet,
              + List<RelNode> inputs,
              + boolean all
              + ) {
              + super(cluster, traitSet, inputs, all);
              + }
              +
              + /** */
              + public IgniteMapMinus(RelInput input) {
              + super(input);
              + }
              +
              + /** {@inheritDoc} */
              + @Override public IgniteMapMinus copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
              + return new IgniteMapMinus(getCluster(), traitSet, inputs, all);
              + }
              +
              + /** {@inheritDoc} */
              + @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
              + return new IgniteMapMinus(cluster, getTraitSet(), Commons.cast(inputs), all);
              + }
              +
              + /** {@inheritDoc} */
              + @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
              + return visitor.visit(this);
              + }
              +
              + /** {@inheritDoc} */
              + @Override protected RelDataType deriveRowType() {
              + RelDataTypeFactory typeFactory = Commons.typeFactory(getCluster());
              +
              + assert typeFactory instanceof IgniteTypeFactory;
              +
              + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
              +
              + builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
              + builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
              +
              + return builder.build();
              + }
              +
              + /** {@inheritDoc} */
              + @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
              + RelTraitSet nodeTraits,
              + List<RelTraitSet> inputTraits
              + ) {
              + boolean rewindable = inputTraits.stream()
              + .map(TraitUtils::rewindability)
              + .allMatch(RewindabilityTrait::rewindable);
              +
              + if (rewindable)
              + return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
              +
              + return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
              + Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
              + }
              +
              + /** {@inheritDoc} */
              + @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
              + RelTraitSet nodeTraits,
              + List<RelTraitSet> inputTraits
              + ) {
              + Set<IgniteDistribution> distributions = inputTraits.stream()
              + .map(TraitUtils::distribution)
              + .collect(Collectors.toSet());
              +
              + ImmutableList.Builder<Pair<RelTraitSet, List<RelTraitSet>>> b = ImmutableList.builder();
              +
              + for (IgniteDistribution distribution : distributions) {

              Review comment:
                     Fixed.




              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 23/Apr/21 06:40
            • Time Spent:
              10m
               
              alex-plekhanov closed pull request #9009:
              URL: https://github.com/apache/ignite/pull/9009


                 


              --
              This is an automated message from the Apache Git Service.
              To respond to the message, please log on to GitHub and use the
              URL above to go to the specific comment.

              For queries about this service, please contact Infrastructure at:
              users@infra.apache.org
            githubbot ASF GitHub Bot logged work - 23/Apr/21 06:40

            People

              alex_pl Aleksey Plekhanov
              jooger Iurii Gerzhedovich
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 5h
                  5h