Description
We check the `active` set is empty during closeLostTasks(). However we don't currently properly clear the restoredPartitions set in some edge cases:
We only remove partitions from restoredPartitions when a) all tasks are done restoring, at which point we clear it entirely(in AssignedStreamTasks#updateRestored), or b) one task at a time, when that task is restoring and is closed.
Say some partitions were still restoring while others had completed and transitioned to running when a rebalance occurs. The still-restoring tasks are all revoked, and closed immediately, and their partitions removed from restoredPartitions. We also suspend & revoke some running tasks that have finished restoring, and remove them from running/runningByPartition.
Now we have only running tasks left, so in TaskManager#updateNewAndRestoringTasks we don’t ever even call AssignedStreamTasks#updateRestored }}and therefore we never get to clear {{restoredPartitions. We then close each of the currently running tasks and remove their partitions from everything, BUT we never got to remove or clear the partitions of the running tasks that we revoked previously.
It turns out we can't just rely on removing from restoredPartitions }}upon completion since the partitions will just be added back to it during the next loop (blocked by KAFKA-9177). For now, we should just remove partitions from {{restoredPartitions when closing or suspending running tasks as well.