Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
0.14.0, 1.0.0, 1.1.0
Description
High-level summary
HiveRelMdSelectivity.computeInnerJoinSelectivity relies on per column number of distinct value to estimate join selectivity.
The way statistics are aggregated for partitioned tables results in discrepancy in number of distinct values which results in different plans between partitioned and un-partitioned schemas.
The table below summarizes the NDVs in computeInnerJoinSelectivity which are used to estimate selectivity of joins.
Column | Partitioned count distincts | Un-Partitioned count distincts |
---|---|---|
sr_customer_sk | 71,245 | 1,415,625 |
sr_item_sk | 38,846 | 62,562 |
sr_ticket_number | 71,245 | 34,931,085 |
ss_customer_sk | 88,476 | 1,415,625 |
ss_item_sk | 38,846 | 62,562 |
ss_ticket_number | 100,756 | 56,256,175 |
The discrepancy is because NDV calculation for a partitioned table assumes that the NDV range is contained within each partition and is calculates as "select max(NUM_DISTINCTS) from PART_COL_STATS” .
This is problematic for columns like ticket number which are naturally increasing with the partitioned date column ss_sold_date_sk.
Suggestions
Use Hyper Log Log as suggested by Gopal, there is an HLL implementation for HBASE co-porccessors which we can use as a reference here
Using the global stats from TAB_COL_STATS and the per partition stats from PART_COL_STATS extrapolate the NDV for the qualified partitions as in :
Max ( (NUM_DISTINCTS from TAB_COL_STATS) x (Number of qualified partitions) / (Number of Partitions), max(NUM_DISTINCTS) from PART_COL_STATS))
More details
While doing TPC-DS Partitioned vs. Un-Partitioned runs I noticed that many of the plans are different, then I dumped the CBO logical plan and I found that join estimates are drastically different
Unpartitioned schema :
2015-02-10 11:33:27,624 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624)) - Plan After Join Reordering: HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2], store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4], as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2956 HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)], agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2954 HiveProjectRel($f0=[$4], $f1=[$8]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2952 HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$3], ss_quantity=[$4], sr_item_sk=[$5], sr_customer_sk=[$6], sr_ticket_number=[$7], sr_return_quantity=[$8], d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2982 HiveJoinRel(condition=[=($9, $0)], joinType=[inner]): rowcount = 40.05611776795562, cumulative cost = {6.056835407771381E8 rows, 0.0 cpu, 0.0 io}, id = 2980 HiveJoinRel(condition=[AND(AND(=($2, $6), =($1, $5)), =($3, $7))], joinType=[inner]): rowcount = 28880.460910696, cumulative cost = {6.05654559E8 rows, 0.0 cpu, 0.0 io}, id = 2964 HiveProjectRel(ss_sold_date_sk=[$0], ss_item_sk=[$2], ss_customer_sk=[$3], ss_ticket_number=[$9], ss_quantity=[$10]): rowcount = 5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2920 HiveTableScanRel(table=[[tpcds_bin_orc_200.store_sales]]): rowcount = 5.50076554E8, cumulative cost = {0}, id = 2822 HiveProjectRel(sr_item_sk=[$2], sr_customer_sk=[$3], sr_ticket_number=[$9], sr_return_quantity=[$10]): rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2923 HiveTableScanRel(table=[[tpcds_bin_orc_200.store_returns]]): rowcount = 5.5578005E7, cumulative cost = {0}, id = 2823 HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2948 HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2946 HiveTableScanRel(table=[[tpcds_bin_orc_200.date_dim]]): rowcount = 73049.0, cumulative cost = {0}, id = 2821
Partitioned schema :
2015-02-10 11:32:16,880 DEBUG [main]: parse.SemanticAnalyzer (SemanticAnalyzer.java:apply(12624)) - Plan After Join Reordering: HiveProjectRel(store_sales_quantitycount=[$0], store_sales_quantityave=[$1], store_sales_quantitystdev=[$2], store_sales_quantitycov=[/($2, $1)], as_store_returns_quantitycount=[$3], as_store_returns_quantityave=[$4], as_store_returns_quantitystdev=[$5], store_returns_quantitycov=[/($5, $4)]): rowcount = 1.0, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2791 HiveAggregateRel(group=[{}], agg#0=[count($0)], agg#1=[avg($0)], agg#2=[stddev_samp($0)], agg#3=[count($1)], agg#4=[avg($1)], agg#5=[stddev_samp($1)]): rowcount = 1.0, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2789 HiveProjectRel($f0=[$3], $f1=[$8]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2787 HiveProjectRel(ss_item_sk=[$4], ss_customer_sk=[$5], ss_ticket_number=[$6], ss_quantity=[$7], ss_sold_date_sk=[$8], sr_item_sk=[$0], sr_customer_sk=[$1], sr_ticket_number=[$2], sr_return_quantity=[$3], d_date_sk=[$9], d_quarter_name=[$10]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2817 HiveJoinRel(condition=[AND(AND(=($5, $1), =($4, $0)), =($6, $2))], joinType=[inner]): rowcount = 100840.08570910375, cumulative cost = {6.064175958973647E8 rows, 0.0 cpu, 0.0 io}, id = 2815 HiveProjectRel(sr_item_sk=[$1], sr_customer_sk=[$2], sr_ticket_number=[$8], sr_return_quantity=[$9]): rowcount = 5.5578005E7, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2758 HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_returns]]): rowcount = 5.5578005E7, cumulative cost = {0}, id = 2658 HiveJoinRel(condition=[=($5, $4)], joinType=[inner]): rowcount = 762935.5811373093, cumulative cost = {5.500766553162274E8 rows, 0.0 cpu, 0.0 io}, id = 2801 HiveProjectRel(ss_item_sk=[$1], ss_customer_sk=[$2], ss_ticket_number=[$8], ss_quantity=[$9], ss_sold_date_sk=[$22]): rowcount = 5.50076554E8, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2755 HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.store_sales]]): rowcount = 5.50076554E8, cumulative cost = {0}, id = 2657 HiveProjectRel(d_date_sk=[$0], d_quarter_name=[$15]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2783 HiveFilterRel(condition=[=($15, '2000Q1')]): rowcount = 101.31622746185853, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io}, id = 2781 HiveTableScanRel(table=[[tpcds_bin_partitioned_orc_200_orig.date_dim]]): rowcount = 73049.0, cumulative cost = {0}, id = 2656
This was puzzling knowing that the stats for both tables are “identical” in TAB_COL_STATS.
Column statistics from TAB_COL_STATS, notice how the column statistics are identical in both cases.
DB_NAME | COLUMN_NAME | COLUMN_TYPE | NUM_NULLS | LONG_HIGH_VALUE | LONG_LOW_VALUE | MAX_COL_LEN | NUM_DISTINCTS |
---|---|---|---|---|---|---|---|
tpcds_bin_orc_200 | d_date_sk | int | 0 | 2,488,070 | 2,415,022 | NULL | 65,332 |
tpcds_bin_partitioned_orc_200 | d_date_sk | int | 0 | 2,488,070 | 2,415,022 | NULL | 65,332 |
tpcds_bin_orc_200 | d_quarter_name | string | 0 | NULL | NULL | 6 | 721 |
tpcds_bin_partitioned_orc_200 | d_quarter_name | string | 0 | NULL | NULL | 6 | 721 |
tpcds_bin_orc_200 | sr_customer_sk | int | 1,009,571 | 1,600,000 | 1 | NULL | 1,415,625 |
tpcds_bin_partitioned_orc_200 | sr_customer_sk | int | 1,009,571 | 1,600,000 | 1 | NULL | 1,415,625 |
tpcds_bin_orc_200 | sr_item_sk | int | 0 | 48,000 | 1 | NULL | 62,562 |
tpcds_bin_partitioned_orc_200 | sr_item_sk | int | 0 | 48,000 | 1 | NULL | 62,562 |
tpcds_bin_orc_200 | sr_ticket_number | int | 0 | 48,000,000 | 1 | NULL | 34,931,085 |
tpcds_bin_partitioned_orc_200 | sr_ticket_number | int | 0 | 48,000,000 | 1 | NULL | 34,931,085 |
tpcds_bin_orc_200 | ss_customer_sk | int | 12,960,424 | 1,600,000 | 1 | NULL | 1,415,625 |
tpcds_bin_partitioned_orc_200 | ss_customer_sk | int | 12,960,424 | 1,600,000 | 1 | NULL | 1,415,625 |
tpcds_bin_orc_200 | ss_item_sk | int | 0 | 48,000 | 1 | NULL | 62,562 |
tpcds_bin_partitioned_orc_200 | ss_item_sk | int | 0 | 48,000 | 1 | NULL | 62,562 |
tpcds_bin_orc_200 | ss_sold_date_sk | int | 0 | 2,452,642 | 2,450,816 | NULL | 2,226 |
tpcds_bin_partitioned_orc_200 | ss_sold_date_sk | int | 0 | 2,452,642 | 2,450,816 | NULL | 2,226 |
tpcds_bin_orc_200 | ss_ticket_number | int | 0 | 48,000,000 | 1 | NULL | 56,256,175 |
tpcds_bin_partitioned_orc_200 | ss_ticket_number | int | 0 | 48,000,000 | 1 | NULL | 56,256,175 |
For partitioned tables we get the statistics using get_aggr_stats_for which eventually issues the query below
select COLUMN_NAME, COLUMN_TYPE, … max(NUM_DISTINCTS), … from PART_COL_STATS Where where DB_NAME = and TABLE_NAME = and COLUMN_NAME in and PARTITION_NAME in (1 … N) group by COLUMN_NAME , COLUMN_TYPE;
…
Attachments
Attachments
Issue Links
- is duplicated by
-
HIVE-9717 The max/min function used by AggrStats for decimal type is not what we expected
- Closed
- is related to
-
HIVE-9905 Investigate ways to improve NDV calculations during stats aggregation [hbase-metastore branch]
- Resolved
-
HIVE-9717 The max/min function used by AggrStats for decimal type is not what we expected
- Closed
-
HIVE-9689 Store histograms and distinct value estimator's bit vectors in metastore
- Open
- relates to
-
HIVE-10612 HIVE-10578 broke TestSQLStdHiveAccessControllerHS2 tests
- Closed
- links to