Details
-
Improvement
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
1.17.0
-
None
-
None
Description
The code below is a function to detect inefficient scaleups.
It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED (scaling.effectiveness.detection.enabled) is true after all the necessary computations for detection, but this is an unnecessary computation.
JobVertexScaler.java #175 private boolean detectIneffectiveScaleUp( AbstractFlinkResource<?, ?> resource, JobVertexID vertex, Configuration conf, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics, ScalingSummary lastSummary) { double lastProcRate = lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 22569.315633422066 double lastExpectedProcRate = lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 37340.0 var currentProcRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage(); // To judge the effectiveness of the scale up operation we compute how much of the expected // increase actually happened. For example if we expect a 100 increase in proc rate and only // got an increase of 10 we only accomplished 10% of the desired increase. If this number is // below the threshold, we mark the scaling ineffective. double expectedIncrease = lastExpectedProcRate - lastProcRate; double actualIncrease = currentProcRate - lastProcRate; boolean withinEffectiveThreshold = (actualIncrease / expectedIncrease) >= conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD); if (withinEffectiveThreshold) { return false; } var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex); eventRecorder.triggerEvent( resource, EventRecorder.Type.Normal, EventRecorder.Reason.IneffectiveScaling, EventRecorder.Component.Operator, message); if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) { LOG.info(message); return true; } else { return false; } }
In the call to the detectIneffectiveScaleUp function, I would suggest checking SCALING_EFFECTIVENESS_DETECTION_ENABLED first, as follows.
JobVertexScaler.java #150 if (currentParallelism == lastSummary.getNewParallelism() && lastSummary.isScaledUp()) { if (scaledUp) { if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) { return detectIneffectiveScaleUp(resource, vertex, conf, evaluatedMetrics, lastSummary); } else { return true; } } else { return detectImmediateScaleDownAfterScaleUp(vertex, conf, lastScalingTs); } }
Attachments
Issue Links
- is related to
-
FLINK-33993 Ineffective scaling detection events are misleading
- Closed