Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Won't Fix
-
None
-
None
-
None
Description
Currently, when a Trigger returns PURGE (or FIRE_AND_PURGE) the window state, trigger state and timer state are completely deleted. Some use cases might only want to have the window contents deleted and keep timers/state until the window is purged by the garbage collection timer.
I propose to add a new method onFire() to Trigger that would allow a Trigger to clean up some state/timers in reaction to a firing. The method is called when a window fires and we emit data. This gives the trigger the chance to reset/cleanup state. The window operator will not purge window state and timers anymore but call Trigger.onFire() to give it a chance to decide whether to delete state and timers.
The new method is not necessary in most cases since a trigger could just replicate the code that would go into onFire() to the other on*() methods and do the required cleanup if we return FIRE. However, with the current interface of Trigger where onMerge() can return a TriggerResult this can lead to inconsistencies since a new incoming element might lead to this flow of actions:
- element comes in
- we merge windows and call onMerge()
- element is added to newly merged window and onElement() is called
- we fire the window if either of the two methods returned FIRE
Notice how the state should not be cleaned up in onMerge().
In this PR https://github.com/apache/flink/pull/2572 I'm changing Trigger.onMerge() to have void as return type so this would mean that we don't strictly need the new method. It would just remove some boilerplate and make the intention of trigger code clearer.