Details
-
Sub-task
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
ghx-label-9
Description
I noticed that the code doesn't check whether the insert is clustered, which would mean it only produces a single partition at a time.
@Override public void computeResourceProfile(TQueryOptions queryOptions) { HdfsTable table = (HdfsTable) targetTable_; // TODO: Estimate the memory requirements more accurately by partition type. HdfsFileFormat format = table.getMajorityFormat(); PlanNode inputNode = fragment_.getPlanRoot(); int numInstances = fragment_.getNumInstances(queryOptions.getMt_dop()); // Compute the per-instance number of partitions, taking the number of nodes // and the data partition of the fragment executing this sink into account. long numPartitionsPerInstance = fragment_.getPerInstanceNdv(queryOptions.getMt_dop(), partitionKeyExprs_); if (numPartitionsPerInstance == -1) { numPartitionsPerInstance = DEFAULT_NUM_PARTITIONS; } long perPartitionMemReq = getPerPartitionMemReq(format); long perInstanceMemEstimate; // The estimate is based purely on the per-partition mem req if the input cardinality_ // or the avg row size is unknown. if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) { perInstanceMemEstimate = numPartitionsPerInstance * perPartitionMemReq; } else { // The per-partition estimate may be higher than the memory required to buffer // the entire input data. long perInstanceInputCardinality = Math.max(1L, inputNode.getCardinality() / numInstances); long perInstanceInputBytes = (long) Math.ceil(perInstanceInputCardinality * inputNode.getAvgRowSize()); long perInstanceMemReq = PlanNode.checkedMultiply(numPartitionsPerInstance, perPartitionMemReq); perInstanceMemEstimate = Math.min(perInstanceInputBytes, perInstanceMemReq); } resourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate); }