Description
What we saw:
The follower fetches offset 116617, which it was able successfully append. However, leader's log start offset in fetch request was 116753, which was higher than fetched offset 116617. When replica fetcher thread tried to increment log start offset to leader's log start offset, it failed with OffsetOutOfRangeException:
[2018-06-23 00:45:37,409] ERROR Error due to (kafka.server.ReplicaFetcherThread)
kafka.common.KafkaException: Error processing data for partition X-N offset 116617
Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment the log start offset to 116753 of partition X-N since it is larger than the high watermark 116619
In leader's log, we see that log start offset was incremented almost at the same time (within one 100 ms or so).
[2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N to 116753
In leader's logic: ReplicaManager#ReplicaManager first calls readFromLocalLog() that reads from local log and returns LogReadResult that contains fetched data and leader's log start offset and HW. However, it then calls ReplicaManager#updateFollowerLogReadResults() which may move leader's log start offset and update leader's log start offset and HW in fetch response. If deleteRecords() happens in between, it is possible that log start offset may move beyond fetched offset. Or possibly, the leader moves log start offset because of deleting old log segments. Basically, the issue is that log start offset can move between records are read from the log and LogReadResult is updated with new log start offset. As a result, fetch response may contain fetched data but leader's log start offset in the response could be set beyond fetched offset (and indicate the state on leader that fetched data does not actually exist anymore on leader).
When a follower receives such fetch response, it will first append, then move it's HW no further than its LEO, which maybe less than leader's log start offset in fetch response, and then call `replica.maybeIncrementLogStartOffset(leaderLogStartOffset)` which will throw OffsetOutOfRangeException exception causing the fetcher thread to stop.
Note that this can happen if the follower is not in ISR, otherwise the leader will not move its log start offsets beyond follower's HW.
Suggested fix:
1) Since ReplicaFetcher bounds follower's HW to follower's LEO, we should also bound follower's log start offset to its LEO. In this situation, the follower's log start offset will be updated to LEO.
2) In addition to #1, we could try to make sure that leader builds fetch response based on the state of the log as of time of reading data from replica (but including moving leader's HW based on the follower's fetch). That could be another JIRA potentially, since the fix could be more involved.