Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.18.1
-
None
Description
Take sum for example:
When state is expired, then an update operation from source happens. MiniBatchGlobalGroupAggFunction take -U[1, 20] and +U[1, 20] as input, but will emit +I[1, -20] and -D[1, -20]. The sink will detele the data from external database.
Let's see why this will happens:
- when state is expired and -U[1, 20] arrive, MiniBatchGlobalGroupAggFunction will create a new sum accumulator and set firstRow as true.
if (stateAcc == null) { stateAcc = globalAgg.createAccumulators(); firstRow = true; }
- then sum accumulator will retract sum value as -20
- As the first row, MiniBatchGlobalGroupAggFunction will change -U as +I, then emit to downstream.
if (!recordCounter.recordCountIsZero(acc)) { // if this was not the first row and we have to emit retractions if (!firstRow) { // ignore } else { // update acc to state accState.update(acc); // this is the first, output new result // prepare INSERT message for new row resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.INSERT); out.collect(resultRow); }
- when next +U[1, 20] arrives, sum accumulator will retract sum value as 0, so RetractionRecordCounter#recordCountIsZero will return true. Because firstRow = false now, will change the +U as -D, then emit to downtream.
if (!recordCounter.recordCountIsZero(acc)) { // ignode }else{ // we retracted the last record for this key // if this is not first row sent out a DELETE message if (!firstRow) { // prepare DELETE message for previous row resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE); out.collect(resultRow); }
So the sink will receiver +I and -D after a source update operation, the data will be delete.