Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31977

If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 1.17.0
    • None
    • Autoscaler
    • None

    Description

      The code below is a function to detect inefficient scaleups.
      It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED (scaling.effectiveness.detection.enabled) is true after all the necessary computations for detection, but this is an unnecessary computation.

      JobVertexScaler.java #175
      
      private boolean detectIneffectiveScaleUp(
              AbstractFlinkResource<?, ?> resource,
              JobVertexID vertex,
              Configuration conf,
              Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
              ScalingSummary lastSummary) {
      
          double lastProcRate = lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 22569.315633422066
          double lastExpectedProcRate =
                  lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 37340.0
          var currentProcRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
      
          // To judge the effectiveness of the scale up operation we compute how much of the expected
          // increase actually happened. For example if we expect a 100 increase in proc rate and only
          // got an increase of 10 we only accomplished 10% of the desired increase. If this number is
          // below the threshold, we mark the scaling ineffective.
          double expectedIncrease = lastExpectedProcRate - lastProcRate;
          double actualIncrease = currentProcRate - lastProcRate;
      
          boolean withinEffectiveThreshold =
                  (actualIncrease / expectedIncrease)
                          >= conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
          if (withinEffectiveThreshold) {
              return false;
          }
      
          var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
      
          eventRecorder.triggerEvent(
                  resource,
                  EventRecorder.Type.Normal,
                  EventRecorder.Reason.IneffectiveScaling,
                  EventRecorder.Component.Operator,
                  message);
      
          if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
              LOG.info(message);
              return true;
          } else {
              return false;
          }
      } 

      In the call to the detectIneffectiveScaleUp function, I would suggest checking SCALING_EFFECTIVENESS_DETECTION_ENABLED first, as follows.

      JobVertexScaler.java #150
      
      if (currentParallelism == lastSummary.getNewParallelism() && lastSummary.isScaledUp()) {
          if (scaledUp) {
              if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
                  return detectIneffectiveScaleUp(resource, vertex, conf, evaluatedMetrics, lastSummary);
              } else {
                  return true;
              }
          } else {
              return detectImmediateScaleDownAfterScaleUp(vertex, conf, lastScalingTs);
          }
      }

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              tanee.kim Tan Kim
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: