Details

    Description

      org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
          java.lang.AssertionError: 
          Expected: <[KeyValue(2986681642095, 1), KeyValue(2986681642055, 1), KeyValue(2986681642075, 1), KeyValue(2986681642035, 1), KeyValue(2986681642095, 1), KeyValue(2986681642055, 1), KeyValue(2986681642115, 1), KeyValue(2986681642075, 1), KeyValue(2986681642075, 2), KeyValue(2986681642095, 2), KeyValue(2986681642115, 1), KeyValue(2986681642135, 1), KeyValue(2986681642095, 2), KeyValue(2986681642115, 2), KeyValue(2986681642155, 1), KeyValue(2986681642135, 1), KeyValue(2986681642115, 2), KeyValue(2986681642135, 2), KeyValue(2986681642155, 1), KeyValue(2986681642175, 1), KeyValue(2986681642135, 2), KeyValue(2986681642155, 2), KeyValue(2986681642175, 1), KeyValue(2986681642195, 1), KeyValue(2986681642135, 3), KeyValue(2986681642155, 2), KeyValue(2986681642175, 2), KeyValue(2986681642195, 1), KeyValue(2986681642155, 3), KeyValue(2986681642175, 2), KeyValue(2986681642195, 2), KeyValue(2986681642155, 3), KeyValue(2986681642175, 3), KeyValue(2986681642195, 2), KeyValue(2986681642155, 4), KeyValue(2986681642175, 3), KeyValue(2986681642195, 3)]>
               but: was <[KeyValue(2986681642095, 1), KeyValue(2986681642055, 1), KeyValue(2986681642075, 1), KeyValue(2986681642035, 1), KeyValue(2986681642095, 1), KeyValue(2986681642055, 1), KeyValue(2986681642115, 1), KeyValue(2986681642075, 1), KeyValue(2986681642075, 2), KeyValue(2986681642095, 2), KeyValue(2986681642115, 1), KeyValue(2986681642135, 1), KeyValue(2986681642095, 2), KeyValue(2986681642115, 2), KeyValue(2986681642155, 1), KeyValue(2986681642135, 1), KeyValue(2986681642115, 2), KeyValue(2986681642135, 2), KeyValue(2986681642155, 1), KeyValue(2986681642175, 1), KeyValue(2986681642135, 2), KeyValue(2986681642155, 2), KeyValue(2986681642175, 1), KeyValue(2986681642195, 1), KeyValue(2986681642135, 3), KeyValue(2986681642155, 2), KeyValue(2986681642175, 2), KeyValue(2986681642195, 1), KeyValue(2986681642155, 3), KeyValue(2986681642175, 2), KeyValue(2986681642195, 2), KeyValue(2986681642155, 3)]>
              at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
              at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
              at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:190)
      

      Attachments

        Issue Links

          Activity

            githubbot ASF GitHub Bot added a comment -

            GitHub user mjsax opened a pull request:

            https://github.com/apache/kafka/pull/2931

            KAFKA-5140: Flaky ResetIntegrationTest

            You can merge this pull request into a Git repository by running:

            $ git pull https://github.com/mjsax/kafka kafka-5140-flaky-reset-integration-test

            Alternatively you can review and apply these changes as the patch at:

            https://github.com/apache/kafka/pull/2931.patch

            To close this pull request, make a commit to your master/trunk branch
            with (at least) the following in the commit message:

            This closes #2931


            commit 62b9bd6e72f28c78f6cd0f7f5d72ad38e97065e6
            Author: Matthias J. Sax <matthias@confluent.io>
            Date: 2017-04-28T03:57:26Z

            KAFKA-5140: Flaky ResetIntegrationTest


            githubbot ASF GitHub Bot added a comment - GitHub user mjsax opened a pull request: https://github.com/apache/kafka/pull/2931 KAFKA-5140 : Flaky ResetIntegrationTest You can merge this pull request into a Git repository by running: $ git pull https://github.com/mjsax/kafka kafka-5140-flaky-reset-integration-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2931.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2931 commit 62b9bd6e72f28c78f6cd0f7f5d72ad38e97065e6 Author: Matthias J. Sax <matthias@confluent.io> Date: 2017-04-28T03:57:26Z KAFKA-5140 : Flaky ResetIntegrationTest
            junrao Jun Rao added a comment -

            Saw the following transient failures in a recent PR builder. https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/8779/consoleFull

            org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
            java.lang.AssertionError: Condition not met within timeout 30000. Expecting 10 records from topic outputTopic while only received 0: []
            at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
            at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:160)
            at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:128)
            at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(AbstractResetIntegrationTest.java:289)
            at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:63)

            org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic STARTED

            org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic FAILED
            java.lang.AssertionError:
            Expected: <[outputTopic, outputTopic2_rerun, inputTopic, outputTopic2, __consumer_offsets]>
            but: was <[outputTopic, outputTopic2_rerun, userTopic, outputTopic2, inputTopic, __consumer_offsets]>
            at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
            at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
            at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.assertInternalTopicsGotDeleted(AbstractResetIntegrationTest.java:470)
            at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(AbstractResetIntegrationTest.java:197)
            at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(ResetIntegrationTest.java:68)

            junrao Jun Rao added a comment - Saw the following transient failures in a recent PR builder. https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/8779/consoleFull org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED java.lang.AssertionError: Condition not met within timeout 30000. Expecting 10 records from topic outputTopic while only received 0: [] at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:160) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:128) at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(AbstractResetIntegrationTest.java:289) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:63) org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic STARTED org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic FAILED java.lang.AssertionError: Expected: < [outputTopic, outputTopic2_rerun, inputTopic, outputTopic2, __consumer_offsets] > but: was < [outputTopic, outputTopic2_rerun, userTopic, outputTopic2, inputTopic, __consumer_offsets] > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8) at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.assertInternalTopicsGotDeleted(AbstractResetIntegrationTest.java:470) at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(AbstractResetIntegrationTest.java:197) at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic(ResetIntegrationTest.java:68)
            guozhang Guozhang Wang added a comment -

            Looking into this issue.

            guozhang Guozhang Wang added a comment - Looking into this issue.
            githubbot ASF GitHub Bot added a comment -

            GitHub user guozhangwang opened a pull request:

            https://github.com/apache/kafka/pull/4095

            KAFKA-5140: Fix reset integration test

            The MockTime was incorrectly used across multiple test methods within the class, as a class rule. Instead we set it on each test case; also remove the scala MockTime dependency.

            You can merge this pull request into a Git repository by running:

            $ git pull https://github.com/guozhangwang/kafka KMinor-reset-integration-test

            Alternatively you can review and apply these changes as the patch at:

            https://github.com/apache/kafka/pull/4095.patch

            To close this pull request, make a commit to your master/trunk branch
            with (at least) the following in the commit message:

            This closes #4095


            commit 1aac7bec94de5b2d26d72bade9e654192f18d576
            Author: Guozhang Wang <wangguoz@gmail.com>
            Date: 2017-10-12T21:18:46Z

            dummy

            commit 83f176c26dd092f924e2d273a4fa763ce5d7cdcf
            Author: Guozhang Wang <wangguoz@gmail.com>
            Date: 2017-10-19T04:44:18Z

            fix issues


            githubbot ASF GitHub Bot added a comment - GitHub user guozhangwang opened a pull request: https://github.com/apache/kafka/pull/4095 KAFKA-5140 : Fix reset integration test The MockTime was incorrectly used across multiple test methods within the class, as a class rule. Instead we set it on each test case; also remove the scala MockTime dependency. You can merge this pull request into a Git repository by running: $ git pull https://github.com/guozhangwang/kafka KMinor-reset-integration-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4095.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4095 commit 1aac7bec94de5b2d26d72bade9e654192f18d576 Author: Guozhang Wang <wangguoz@gmail.com> Date: 2017-10-12T21:18:46Z dummy commit 83f176c26dd092f924e2d273a4fa763ce5d7cdcf Author: Guozhang Wang <wangguoz@gmail.com> Date: 2017-10-19T04:44:18Z fix issues
            guozhang Guozhang Wang added a comment -

            Discovered the root cause of this flaky test is this: https://issues.apache.org/jira/browse/KAFKA-6098

            More specifically, here is the (augmented) log trails to expose this issue:

            1. After the reset tool is called, and assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC) has passed (but the ZK path has not been deleted due to the above issue). The streams app will resume executing and in the first rebalance the StreamPartitionAssignor will try to create the deleted topic again:

            org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithIntermediateUserTopic STANDARD_OUT
                [2017-10-19 18:28:12,027] INFO [GroupCoordinator 0]: Preparing to rebalance group cleanup-integration-test1 with old generation 4 (__consumer_offsets-1) (kafka.coordinator.group.GroupCoordinator:72)
                [2017-10-19 18:28:12,028] INFO [GroupCoordinator 0]: Stabilized group cleanup-integration-test1 generation 5 (__consumer_offsets-1) (kafka.coordinator.group.GroupCoordinator:72)
                [2017-10-19 18:28:12,029] WARN stream-thread [cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10] Constructed client metadata {e86cdf4e-781a-408a-8414-1115d9558914=ClientMetadata{hostInfo=null, consumers=[cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10-consumer-bd575ab5-c159-4c8a-9130-1ac896c23595], state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}} from the member subscriptions. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:241)
                [2017-10-19 18:28:12,029] WARN stream-thread [cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10] Starting to validate internal topics in partition assignor. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:236)
            

            2. And then following entry can be found at the broker side:

                [2017-10-19 18:28:12,280] INFO [Admin Manager on Broker 0]: Error processing create topic request for topic cleanup-integration-test1-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition with arguments (numPartitions=1, replicationFactor=1, replicasAssignments={}, configs={cleanup.policy=delete}) (kafka.server.AdminManager:80)
                org.apache.kafka.common.errors.TopicExistsException: Topic 'cleanup-integration-test1-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition' already exists.
            

            3. StreamsKafkaClient takes the error code as OK, and then moves on to the validation phase, which will be blocking forever.

                        // wait until each one of the topic metadata has been propagated to at least one broker
                        while (!allTopicsCreated(topicNamesToMakeReady, topicsToMakeReady)) {
                            try {
                                Thread.sleep(50L);
                            } catch (InterruptedException e) {
                                // ignore
                            }
                        }
            

            4. And after 2 seconds (which is the session timeout value), the stream consumer will be kicked out of the group as it is blocked on the above phase.

            org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithIntermediateUserTopic STANDARD_OUT
                [2017-10-19 18:28:14,030] INFO [GroupCoordinator 0]: Member cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10-consumer-bd575ab5-c159-4c8a-9130-1ac896c23595 in group cleanup-integration-test1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator:72)
                [2017-10-19 18:28:14,032] INFO [GroupCoordinator 0]: Preparing to rebalance group cleanup-integration-test1 with old generation 5 (__consumer_offsets-1) (kafka.coordinator.group.GroupCoordinator:72)
                [2017-10-19 18:28:14,032] INFO [GroupCoordinator 0]: Group cleanup-integration-test1 with generation 6 is now empty (__consumer_offsets-1) (kafka.coordinator.group.GroupCoordinator:72)
            

            And after 30 seconds the test will fail.

            guozhang Guozhang Wang added a comment - Discovered the root cause of this flaky test is this: https://issues.apache.org/jira/browse/KAFKA-6098 More specifically, here is the (augmented) log trails to expose this issue: 1. After the reset tool is called, and assertInternalTopicsGotDeleted(INTERMEDIATE_USER_TOPIC) has passed (but the ZK path has not been deleted due to the above issue). The streams app will resume executing and in the first rebalance the StreamPartitionAssignor will try to create the deleted topic again: org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithIntermediateUserTopic STANDARD_OUT [2017-10-19 18:28:12,027] INFO [GroupCoordinator 0]: Preparing to rebalance group cleanup-integration-test1 with old generation 4 (__consumer_offsets-1) (kafka.coordinator.group.GroupCoordinator:72) [2017-10-19 18:28:12,028] INFO [GroupCoordinator 0]: Stabilized group cleanup-integration-test1 generation 5 (__consumer_offsets-1) (kafka.coordinator.group.GroupCoordinator:72) [2017-10-19 18:28:12,029] WARN stream-thread [cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10] Constructed client metadata {e86cdf4e-781a-408a-8414-1115d9558914=ClientMetadata{hostInfo= null , consumers=[cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10-consumer-bd575ab5-c159-4c8a-9130-1ac896c23595], state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}} from the member subscriptions. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:241) [2017-10-19 18:28:12,029] WARN stream-thread [cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10] Starting to validate internal topics in partition assignor. (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:236) 2. And then following entry can be found at the broker side: [2017-10-19 18:28:12,280] INFO [Admin Manager on Broker 0]: Error processing create topic request for topic cleanup-integration-test1-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition with arguments (numPartitions=1, replicationFactor=1, replicasAssignments={}, configs={cleanup.policy=delete}) (kafka.server.AdminManager:80) org.apache.kafka.common.errors.TopicExistsException: Topic 'cleanup-integration-test1-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition' already exists. 3. StreamsKafkaClient takes the error code as OK, and then moves on to the validation phase, which will be blocking forever. // wait until each one of the topic metadata has been propagated to at least one broker while (!allTopicsCreated(topicNamesToMakeReady, topicsToMakeReady)) { try { Thread .sleep(50L); } catch (InterruptedException e) { // ignore } } 4. And after 2 seconds (which is the session timeout value), the stream consumer will be kicked out of the group as it is blocked on the above phase. org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithIntermediateUserTopic STANDARD_OUT [2017-10-19 18:28:14,030] INFO [GroupCoordinator 0]: Member cleanup-integration-test1-e86cdf4e-781a-408a-8414-1115d9558914-StreamThread-10-consumer-bd575ab5-c159-4c8a-9130-1ac896c23595 in group cleanup-integration-test1 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator:72) [2017-10-19 18:28:14,032] INFO [GroupCoordinator 0]: Preparing to rebalance group cleanup-integration-test1 with old generation 5 (__consumer_offsets-1) (kafka.coordinator.group.GroupCoordinator:72) [2017-10-19 18:28:14,032] INFO [GroupCoordinator 0]: Group cleanup-integration-test1 with generation 6 is now empty (__consumer_offsets-1) (kafka.coordinator.group.GroupCoordinator:72) And after 30 seconds the test will fail.
            githubbot ASF GitHub Bot added a comment -

            Github user asfgit closed the pull request at:

            https://github.com/apache/kafka/pull/4095

            githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/4095
            guozhang Guozhang Wang added a comment -

            Issue resolved by pull request 4095
            https://github.com/apache/kafka/pull/4095

            guozhang Guozhang Wang added a comment - Issue resolved by pull request 4095 https://github.com/apache/kafka/pull/4095

            People

              guozhang Guozhang Wang
              mjsax Matthias J. Sax
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: