Details
Description
When stages are retried due to shuffle failures, task attempt numbers are reused. This causes a correctness bug in the v2 data sources write path.
Data sources (both the original and v2) pass the task attempt to writers so that writers can use the attempt number to track and clean up data from failed or speculative attempts. In the v2 docs for DataWriterFactory, the attempt number's javadoc states that "Implementations can use this attempt number to distinguish writers of different task attempts."
When two attempts of a stage use the same (partition, attempt) pair, two tasks can create the same data and attempt to commit. The commit coordinator prevents both from committing and will abort the attempt that finishes last. When using the (partition, attempt) pair to track data, the aborted task may delete data associated with the (partition, attempt) pair. If that happens, the data for the task that committed is also deleted as well, which is a correctness bug.
For a concrete example, I have a data source that creates files in place named with part-<partition><attempt><uuid>.<format>. Because these files are written in place, both tasks create the same file and the one that is aborted deletes the file, leading to data corruption when the file is added to the table.