Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34129

MiniBatchGlobalGroupAggFunction will make -D as +I then make +I as -U when state expired

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.18.1
    • 2.0.0
    • Table SQL / Runtime
    • None

    Description

      Take sum for example:
      When state is expired, then an update operation from source happens. MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from external database.

      Let's see why this will happens:

      • when state is expired and -U[1, 20] arrive, MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set firstRow as true.
      if (stateAcc == null) { 
          stateAcc = globalAgg.createAccumulators(); 
          firstRow = true; 
      }   
      • then sum accumulator will retract sum value as -20
      • As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, then emit to downstream.
      if (!recordCounter.recordCountIsZero(acc)) {
         // if this was not the first row and we have to emit retractions
          if (!firstRow) {
             // ignore
          } else {
          // update acc to state
          accState.update(acc);
       
         // this is the first, output new result
         // prepare INSERT message for new row
         resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT);
         out.collect(resultRow);
      }  
      • when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, so RetractionRecordCounter#recordCountIsZero will return true. Because firstRow = false now, will change the +U as -D, then emit to downtream.
      if (!recordCounter.recordCountIsZero(acc)) {
          // ignode
      }else{
         // we retracted the last record for this key
         // if this is not first row sent out a DELETE message
         if (!firstRow) {
         // prepare DELETE message for previous row
         resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
         out.collect(resultRow);
      } 

       
      So the sink will receiver +I and -D after a source update operation, the data will be delete.

      Attachments

        Activity

          People

            Unassigned Unassigned
            loserwang1024 Hongshun Wang
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated: