Description
We supported backward iterator for SessionStore in KAFKA-9929. But we cannot return the correct order when fetch/backwardFetch the key range when there are multiple records in the same session window.
For example:
We have a session window inactivity gap with 10 ms, and the records:
key: "A", value: "AA", timestamp: 0 --> with SessionWindow(0, 0)
key: "B", value: "BB", timestamp: 0 --> with SessionWindow(0, 0)
key: "C", value: "CC", timestamp: 0 --> with SessionWindow(0, 0)
key: "D" value: "DD", timestamp: 100 --> with SessionWindow(100, 100)
So, when fetch("A" /*key from*/, "D" /*key to*/), we expected to have [A, B, C, D], but we'll have [C, B A, D ]
And the reason is here:
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) { return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); // <-- the final param is "isFarwarded", which should be true for "fetch" case, and false for "backwardFetch" case }
We pass "false" in the "is forward" parameter for `fetch` method, and "true" for "backwardFetch" method, which obviously is wrong.
Attachments
Issue Links
- links to