Description
Hi everyone! I would like to understand why KafkaStreams DSL offer the ability to express a SlidingWindow with no grace period but seems that it doesn't work. confluent's site state that grace period is required and with the deprecated method, it's default to 24 hours.
Doing a basic sliding window with a count, if I set grace period to 1 ms, expected output is done. Based on the sliding window documentation, lower and upper bounds are inclusive.
If I set grace period to 0 ms, I can see that record is not skipped at KStreamSlidingWindowAggregate(l.126) but when we try to create the window and push the event in KStreamSlidingWindowAggregate#createWindows we call the method updateWindowAndForward(l.417). This method (l.468) check that windowEnd > closeTime.
closeTime is defined as observedStreamTime - window.gracePeriodMs (Sliding window configuration)
windowEnd is defined as inputRecordTimestamp.
For a first event with a record timestamp, we can assume that observedStreamTime is equal to inputRecordTimestamp.
Therefore, closeTime is inputRecordTimestamp - 0 (gracePeriodMS) which results to inputRecordTimestamp.
If we go back to the check done in updateWindowAndForward method, then we have inputRecordTimestamp > inputRecordTimestamp which is always false. The record is then skipped for record's own window.
Stating that lower and upper bounds are inclusive, I would have expected the event to be pushed in the store and forwarded. Hence, the check would be windowEnd >= closeTime.
Is it a bug or is it intended ?
Thanks in advance for your explanations!
Best regards!
Attachments
Issue Links
- links to