Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.8.0
Description
As discussed on the mailing list, there appears to be a bug where a job that uses a custom FilterableTableSource does not keep the filters that were pushed down into the table source. More specifically, the table source does receive filters via applyPredicates, and a new table source with those filters is returned, but the final job graph appears to use the original table source, which does not contain any filters.
I attached a minimal example program to this ticket. The custom table source is as follows:
public class CustomTableSource implements BatchTableSource<Model>, FilterableTableSource<Model> { private static final Logger LOG = LoggerFactory.getLogger(CustomTableSource.class); private final Filter[] filters; private final FilterConverter converter = new FilterConverter(); public CustomTableSource() { this(null); } private CustomTableSource(Filter[] filters) { this.filters = filters; } @Override public DataSet<Model> getDataSet(ExecutionEnvironment execEnv) { if (filters == null) { LOG.info("==== No filters defined ===="); } else { LOG.info("==== Found filters ===="); for (Filter filter : filters) { LOG.info("FILTER: {}", filter); } } return execEnv.fromCollection(allModels()); } @Override public TableSource<Model> applyPredicate(List<Expression> predicates) { LOG.info("Applying predicates"); List<Filter> acceptedFilters = new ArrayList<>(); for (final Expression predicate : predicates) { converter.convert(predicate).ifPresent(acceptedFilters::add); } return new CustomTableSource(acceptedFilters.toArray(new Filter[0])); } @Override public boolean isFilterPushedDown() { return filters != null; } @Override public TypeInformation<Model> getReturnType() { return TypeInformation.of(Model.class); } @Override public TableSchema getTableSchema() { return TableSchema.fromTypeInfo(getReturnType()); } private List<Model> allModels() { List<Model> models = new ArrayList<>(); models.add(new Model(1, 2, 3, 4)); models.add(new Model(10, 11, 12, 13)); models.add(new Model(20, 21, 22, 23)); return models; } }
When run, it logs
15:24:54,888 INFO com.klaviyo.filterbug.CustomTableSource - Applying predicates 15:24:54,901 INFO com.klaviyo.filterbug.CustomTableSource - Applying predicates 15:24:54,910 INFO com.klaviyo.filterbug.CustomTableSource - Applying predicates 15:24:54,977 INFO com.klaviyo.filterbug.CustomTableSource - ==== No filters defined ====
which appears to indicate that although filters are getting pushed down, the final job does not use them.
Attachments
Attachments
Issue Links
- causes
-
FLINK-16860 Failed to push filter into OrcTableSource when upgrading to 1.9.2
- Resolved
- links to