Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.6.0, 3.2.0, 3.7.0
-
None
-
None
Description
The ErrantRecordReporter introduced in KIP-610 (2.6.0) allows sink connectors to push records to the connector DLQ topic. The implementation of this reporter interacts with the ProcessingContext within the per-task RetryWithToleranceOperator. The ProcessingContext stores mutable state about the current operation, such as what error has occurred or what record is being operated on.
The ProcessingContext and RetryWithToleranceOperator is also used by the converter and transformation pipeline of the connector for similar reasons. When the ErrantRecordReporter#report function is called from SinkTask#put, there is no contention over the mutable state, as the thread used for SinkTask#put is also responsible for converting and transforming the record. However, if ErrantRecordReporter#report is called by an extra thread within the SinkTask, there is thread contention on the single mutable ProcessingContext.
This was noticed in https://issues.apache.org/jira/browse/KAFKA-10602 and the synchronized keyword was added to all methods of RetryWithToleranceOperator which interact with the ProcessingContext. However, this solution still allows the RWTO methods to interleave, and produce unintended data races. Consider the following interleaving:
1. Thread 1 converts and transforms record A successfully.
2. Thread 1 calls SinkTask#put(A) and delivers the message to the task.
3. Thread 1 queues some other thread 2 with some delay to call ErrantRecordReporter#report(A).
4. Thread 1 returns from SinkTask#put and polls record B from the consumer.
5. Thread 1 calls RWTO#execute for a converter or transformation operation. For example, converting headers
6. The operation succeeds, and the ProcessingContext is left with error == null, or equivalently failed() == false.
7. Thread 2 has it's delay expire, and it calls ErrantRecordReporter#report.
8. Thread 2 uses the WorkerErrantRecordReporter implementation, which calls RWTO executeFailed and returns.
9. The operation leaves ProcessingContext with error != null, or equivalently failed() == true.
10. Thread 1 then resumes execution, and calls RWTO failed which evaluates to true.
11. Thread 1 then drops Record B, even though the header conversion succeeded without error.
12. Record B is never delivered to the Sink Task, and never delivered to the error reporter for processing, despite having produced no error during processing.
This per-method synchronization for returning nulls and errors separately is insufficient, and either the data sharing should be avoided or a different locking mechanism should be used.
A similar flaw exists in source connectors and asynchronous errors reported by the producer, and was introduced in KIP-779 (3.2.0)
Attachments
Issue Links
- is related to
-
KAFKA-10602 DLQ Reporter throws NPE when reporting from different thread
- Resolved
- links to