Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.11.0, 1.12.0
-
None
Description
Add the following test in the RetractableTopNFunctionTest.
@Test public void testCornerCase2() throws Exception { AbstractTopNFunction func = createFunction(RankType.ROW_NUMBER, new ConstantRankRange(1, 2), false, false); OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = createTestHarness(func); testHarness.open(); testHarness.processElement(insertRecord("a", 1L, 1)); testHarness.processElement(insertRecord("a", 2L, 2)); testHarness.processElement(insertRecord("a", 3L, 2)); testHarness.processElement(insertRecord("a", 4L, 4)); testHarness.processElement(insertRecord("a", 5L, 4)); testHarness.processElement(deleteRecord("a", 4L, 4)); testHarness.processElement(deleteRecord("a", 1L, 1)); testHarness.processElement(deleteRecord("a", 2L, 2)); testHarness.close(); List<Object> expectedOutput = new ArrayList<>(); expectedOutput.add(insertRecord("a", 1L, 1)); expectedOutput.add(insertRecord("a", 2L, 2)); expectedOutput.add(deleteRecord("a", 1L, 1)); expectedOutput.add(insertRecord("a", 3L, 2)); expectedOutput.add(deleteRecord("a", 2L, 2)); expectedOutput.add(insertRecord("a", 5L, 4)); assertorWithRowNumber.assertOutputEquals("output wrong.", expectedOutput, testHarness.getOutput()); }
When the operator gets delete message, it will only delete the record whose current rank is in the range. If we keep deleting the message in the range, the operator will send the undeleted message to the sink.
Attachments
Issue Links
- is related to
-
FLINK-20121 Test the upsert-kafka Connector
- Closed