Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35661

MiniBatchGroupAggFunction can silently drop records under certain conditions

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Critical
    • Resolution: Unresolved
    • None
    • None
    • Table SQL / Runtime
    • None

    Description

      The story / Symptoms

      One day we changed a bit our Flink job that utilizes Flink SQL via adding a couple of UDF-based aggregations (it's not important what these aggregations are doing) and surprisingly the job started working incorrectly - producing wrong results for some aggregation keys, or not producing some keys at all.

      The symptoms were really weird.  For instance, the read / write access rate to accState (the internal state used by Table SQL for group by aggregations) dropped sharply. On the screenshot you see the comparison of read rate to this state with the similar chart 1d ago - they should behave the same, yet we see a big difference that after the change. Similar picture was about write rate.

      Another interesting observation was that GroupAggregate operator (the one from Table SQL responsible for group by aggregation) behaved weirdly: the number of "records out" was disproportionally less than the number of "records in". By itself it doesn't mean anything, but combined with our other observations about the job producing wrong results - this seems suspicious.

      Digging deeper

      After reverting the change things got back to normal. And we concluded that adding new UDF-based aggregations caused the issue. Then we realized that we accidentally forgot to implement merge method in our UDF and this caused the planner to fallback to 
      ONE_PHASE aggregation instead of TWO_PHASE. After fixing the mistake and implementing merge things got back to normal.

      Moreover, __ we realized that UDF actually has nothing to do with the issue (except for causing that ONE_PHASE fallback). So we reverted all the changes and tested the job in ONE_PHASE. The issue was happening in such a mode. 
      So, summarizing: when the job has mini-batch enabled, ONE_PHASE aggregation works incorrectly.{}

      The bug

      It was clear that the issue has something to do with MiniBatchGroupAggFunction because this is what distinguish ONE_PHASE from TWO_PHASE mode.

       

      After reading the code, we found this interesting fragment:

          @Override
          public void finishBundle(Map<RowData, List<RowData>> buffer, Collector<RowData> out)
                  throws Exception {
              for (Map.Entry<RowData, List<RowData>> entry : buffer.entrySet()) {
                  RowData currentKey = entry.getKey();
                  List<RowData> inputRows = entry.getValue();
                  boolean firstRow = false;
                  // step 1: get the accumulator for the current key
                  // set current key to access state under the key
                  ctx.setCurrentKey(currentKey);
                  RowData acc = accState.value();
                  if (acc == null) {
                      // Don't create a new accumulator for a retraction message. This
                      // might happen if the retraction message is the first message for the
                      // key or after a state clean up.
                      Iterator<RowData> inputIter = inputRows.iterator();
                      while (inputIter.hasNext()) {
                          RowData current = inputIter.next();
                          if (isRetractMsg(current)) {
                              inputIter.remove(); // remove all the beginning retraction messages
                          } else {
                              break;
                          }
                      }
                      if (inputRows.isEmpty()) {
                          return; // !!!! <--- this is bad !!!!
                      }
                      acc = function.createAccumulators();
                      firstRow = true;
                  }
         // ...
      }
      
      
      

      In this code we iterate over the whole bundle key by key and at some point do this:

      if (inputRows.isEmpty()) {
           return; 
      }

      Obviously, what was meant here is continue (i.e.: finish with the current key, move to the next), not the full stop.

       

       

      This line is reached when the bundle contains a key that has only retraction messages - in this case the code below would result in inputRows being empty:

       

      while (inputIter.hasNext()) {
                          RowData current = inputIter.next();
                          if (isRetractMsg(current)) {
                              inputIter.remove(); // remove all the beginning retraction messages
                          } else {
                              break;
                          }
                      }

       

      Summary / conditions

      To summarize, the bug is triggering when:

      1. Mini-batch is enabled
      2. ONE_PHASE aggregation phase is working
      3. Mini-batch bundle contains keys having only retraction messages

      When such conditions are met, MiniBatchGroupAggFunction may drop some records.

       
       
       
       

      Attachments

        1. image-2024-06-20-11-05-53-253.png
          247 kB
          Ivan Burmistrov
        2. image-2024-06-20-10-50-20-867.png
          251 kB
          Ivan Burmistrov
        3. image-2024-06-20-10-46-51-347.png
          405 kB
          Ivan Burmistrov

        Activity

          People

            Unassigned Unassigned
            isburmistrov Ivan Burmistrov
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: