Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
Impala 2.6.0, Impala 2.7.0, Impala 2.8.0, Impala 2.9.0, Impala 2.10.0, Impala 2.11.0
-
ghx-label-9
Description
Hash partition computation for exchange operators can benefit from codegen, profile data ~20% of CPU in the fragment thread is consumed by RawValue::GetHashValueFnv & ExprContext::GetValue
// hash-partition batch's rows across channels int num_channels = channels_.size(); for (int i = 0; i < batch->num_rows(); ++i) { TupleRow* row = batch->GetRow(i); uint32_t hash_val = HashUtil::FNV_SEED; for (int i = 0; i < partition_expr_ctxs_.size(); ++i) { ExprContext* ctx = partition_expr_ctxs_[i]; void* partition_val = ctx->GetValue(row); // We can't use the crc hash function here because it does not result // in uncorrelated hashes with different seeds. Instead we must use // fnv hash. // TODO: fix crc hash/GetHashValue() hash_val = RawValue::GetHashValueFnv(partition_val, ctx->root()->type(), hash_val); } ExprContext::FreeLocalAllocations(partition_expr_ctxs_); RETURN_IF_ERROR(channels_[hash_val % num_channels]->AddRow(row)); }
Function Stack | Effective Time % |
Total | 100% |
clone | 99% |
start_thread | 99% |
thread_proxy | 99% |
boost::detail::thread_data<boost::_bi::bind_t<>::run | 99% |
boost::_bi::bind_t<void, void (), ::operator() | 99% |
operator()<void (const std::basic_string< | 99% |
impala::Thread::SuperviseThread | 99% |
boost::function0<void>::operator() | 99% |
impala::QueryExecMgr::ExecFInstance | 99% |
impala::FragmentInstanceState::Exec | 99% |
impala::PlanFragmentExecutor::Exec | 99% |
impala::PlanFragmentExecutor::ExecInternal | 96% |
impala::DataStreamSender::Send | 91% |
impala::DataStreamSender::Channel::AddRow | 56% |
impala::RawValue::GetHashValueFnv | 11% |
impala::ExprContext::GetValue | 11% |
impala::ExprContext::FreeLocalAllocations | 6% |
impala::RowBatch::GetRow | 1% |
std::vector<impala::ExprContext*, std::allocator<impala::ExprContext*>>::size | 1% |
impala::Expr::type | 0% |
impala::ExprContext::GetValue | 0% |
impala::RuntimeState::CheckQueryState | 0% |
impala::HdfsScanNode::GetNext | 3% |
impala::RowBatch::Reset | 1% |
Status | 0% |
~ScopedTimer | 0% |
[Unknown stack frame(s)] | 4% |
Query used in repro
select /* +straight_join */ count(*) from store_sales a join /* +shuffle */ store_returns b on a.ss_item_sk = b.sr_item_sk where a.ss_ticket_number = b.sr_ticket_number and ss_sold_date_sk between 2450816 and 2451500 and sr_returned_date_sk between 2450816 and 2451500 group by a.ss_ticket_number having count(*) > 9999999999
Explain plan
+------------------------------------------------------------------------------------------+
| Explain String |
+------------------------------------------------------------------------------------------+
| Estimated Per-Host Requirements: Memory=3.43GB VCores=3 |
| |
| PLAN-ROOT SINK |
| | |
| 08:EXCHANGE [UNPARTITIONED] |
| | |
| 07:AGGREGATE [FINALIZE] |
| | output: count:merge(*) |
| | group by: a.ss_ticket_number |
| | having: count(*) > 9999999999 |
| | |
| 06:EXCHANGE [HASH(a.ss_ticket_number)] |
| | |
| 03:AGGREGATE [STREAMING] |
| | output: count(*) |
| | group by: a.ss_ticket_number |
| | |
| 02:HASH JOIN [INNER JOIN, PARTITIONED] |
| | hash predicates: a.ss_item_sk = b.sr_item_sk, a.ss_ticket_number = b.sr_ticket_number |
| | runtime filters: RF000 <- b.sr_item_sk, RF001 <- b.sr_ticket_number |
| | |
| |--05:EXCHANGE [HASH(b.sr_item_sk,b.sr_ticket_number)] |
| | | |
| | 01:SCAN HDFS [tpcds_3000_parquet.store_returns b] |
| | partitions=681/2004 files=681 size=13.73GB |
| | |
| 04:EXCHANGE [HASH(a.ss_item_sk,a.ss_ticket_number)] |
| | |
| 00:SCAN HDFS [tpcds_3000_parquet.store_sales a] |
| partitions=683/1824 files=944 size=140.19GB |
| runtime filters: RF000 -> a.ss_item_sk, RF001 -> a.ss_ticket_number |
+------------------------------------------------------------------------------------------+
Attachments
Issue Links
- is related to
-
IMPALA-2281 Use a better hash function than FNV for exchanges
- Resolved