Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
0.12.0
-
None
-
None
Description
On my Mac build, running TestStreamProcessor hangs due to the following issue:
01:35:13.680 [DEBUG] [TestEventLogger] org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor STANDARD_OUT 01:35:13.680 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.678 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$TopicChangeListener [INFO] [TopicChangeListener on Controller 0]: New topics: [Set()], deleted topics: [Set()], new partition replica assignment [Map()] 01:35:13.683 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.683 [ZkClient-EventThread-28-127.0.0.1:53690] ZookeeperLeaderElector$LeaderChangeListener [INFO] New leader is 0 01:35:13.686 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.686 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio nsListener on 0]: Partition modification triggered {"version":1,"partitions":{"0":[0]}} for path /brokers/topics/numbers 01:35:13.691 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.690 [ZkClient-EventThread-28-127.0.0.1:53690] PartitionStateMachine$PartitionModificationsListener [INFO] [AddPartitio nsListener on 0]: Partition modification triggered {"version":1,"partitions":{"0":[0]}} for path /brokers/topics/output 01:35:13.693 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.693 [ZkClient-EventThread-28-127.0.0.1:53690] ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on Controller 0]: Broker change listener fired for path /brokers/ids with children 0 01:35:13.697 [DEBUG] [TestEventLogger] 2017-02-15 01:35:13.696 [ZkClient-EventThread-28-127.0.0.1:53690] ReplicaStateMachine$BrokerChangeListener [INFO] [BrokerChangeListener on Controller 0]: Newly added brokers: , deleted brokers: , all live brokers: 0 01:35:17.678 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.677 [Test worker] VerifiableProperties [INFO] Verifying properties 01:35:17.678 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property client.id is overridden to samza_admin-test_job-1 01:35:17.679 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property metadata.broker.list is overridden to :53693 01:35:17.679 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.678 [Test worker] VerifiableProperties [INFO] Property request.timeout.ms is overridden to 30000 01:35:17.679 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.678 [Test worker] ClientUtils$ [INFO] Fetching metadata from broker BrokerEndPoint(0,,53693) with correlation id 0 for 1 topic(s) Set(numbers) 01:35:17.680 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.679 [Test worker] SyncProducer [INFO] Connected to :53693 for producing 01:35:17.681 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.679 [Test worker] SyncProducer [INFO] Disconnecting from :53693 01:35:17.683 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.679 [Test worker] ClientUtils$ [WARN] Fetching topic metadata with correlation id 0 for topics [Set(numbers)] from broker [BrokerEndPoint(0,,53693)] failed 01:35:17.683 [DEBUG] [TestEventLogger] java.nio.channels.ClosedChannelException 01:35:17.683 [DEBUG] [TestEventLogger] at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) 01:35:17.683 [DEBUG] [TestEventLogger] at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) 01:35:17.683 [DEBUG] [TestEventLogger] at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) 01:35:17.683 [DEBUG] [TestEventLogger] at kafka.producer.SyncProducer.send(SyncProducer.scala:124) 01:35:17.683 [DEBUG] [TestEventLogger] at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) 01:35:17.683 [DEBUG] [TestEventLogger] at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) 01:35:17.683 [DEBUG] [TestEventLogger] at org.apache.samza.util.ClientUtilTopicMetadataStore.getTopicInfo(ClientUtilTopicMetadataStore.scala:37) 01:35:17.683 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin.getTopicMetadata(KafkaSystemAdmin.scala:373) 01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158) 01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2$$anonfun$6.apply(KafkaSystemAdmin.scala:158) 01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.TopicMetadataCache$.getTopicMetadata(TopicMetadataCache.scala:52) 01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:155) 01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$getSystemStreamPartitionCounts$2.apply(KafkaSystemAdmin.scala:154) 01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82) 01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:153) 01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.kafka.KafkaSystemAdmin.getSystemStreamPartitionCounts(KafkaSystemAdmin.scala:147) 01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:67) 01:35:17.684 [DEBUG] [TestEventLogger] at org.apache.samza.system.StreamMetadataCache$$anonfun$3.apply(StreamMetadataCache.scala:62) 01:35:17.685 [DEBUG] [TestEventLogger] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 01:35:17.685 [DEBUG] [TestEventLogger] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 01:35:17.685 [DEBUG] [TestEventLogger] at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) 01:35:17.685 [DEBUG] [TestEventLogger] at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 01:35:17.685 [DEBUG] [TestEventLogger] at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) 01:35:17.685 [DEBUG] [TestEventLogger] at org.apache.samza.system.StreamMetadataCache.getStreamMetadata(StreamMetadataCache.scala:62) 01:35:17.685 [DEBUG] [TestEventLogger] at org.apache.samza.coordinator.JobModelManager$.getInputStreamPartitions(JobModelManager.scala:143) 01:35:17.685 [DEBUG] [TestEventLogger] at org.apache.samza.coordinator.JobModelManager$.getMatchedInputStreamPartitions(JobModelManager.scala:154) 01:35:17.685 [DEBUG] [TestEventLogger] at org.apache.samza.coordinator.JobModelManager$.initializeJobModel(JobModelManager.scala:193) 01:35:17.685 [DEBUG] [TestEventLogger] at org.apache.samza.coordinator.JobModelManager$.getJobCoordinator(JobModelManager.scala:125) 01:35:17.686 [DEBUG] [TestEventLogger] at org.apache.samza.standalone.StandaloneJobCoordinator.<init>(StandaloneJobCoordinator.java:108) 01:35:17.686 [DEBUG] [TestEventLogger] at org.apache.samza.standalone.StandaloneJobCoordinatorFactory.getJobCoordinator(StandaloneJobCoordinatorFactory.java:29) 01:35:17.686 [DEBUG] [TestEventLogger] at org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:134) 01:35:17.686 [DEBUG] [TestEventLogger] at org.apache.samza.processor.StreamProcessor.<init>(StreamProcessor.java:111) 01:35:17.686 [DEBUG] [TestEventLogger] at org.apache.samza.test.processor.TestStreamProcessor.testStreamProcessor(TestStreamProcessor.java:72) 01:35:17.686 [DEBUG] [TestEventLogger] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 01:35:17.686 [DEBUG] [TestEventLogger] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 01:35:17.686 [DEBUG] [TestEventLogger] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 01:35:17.686 [DEBUG] [TestEventLogger] at java.lang.reflect.Method.invoke(Method.java:483) 01:35:17.686 [DEBUG] [TestEventLogger] at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) 01:35:17.686 [DEBUG] [TestEventLogger] at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) 01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) 01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) 01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) 01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31) 01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76) 01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) 01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193) 01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52) 01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191) 01:35:17.687 [DEBUG] [TestEventLogger] at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42) 01:35:17.688 [DEBUG] [TestEventLogger] at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184) 01:35:17.688 [DEBUG] [TestEventLogger] at org.junit.runners.ParentRunner.run(ParentRunner.java:236) 01:35:17.688 [DEBUG] [TestEventLogger] at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105) 01:35:17.688 [DEBUG] [TestEventLogger] at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56) 01:35:17.688 [DEBUG] [TestEventLogger] at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64) 01:35:17.688 [DEBUG] [TestEventLogger] at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50) 01:35:17.688 [DEBUG] [TestEventLogger] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 01:35:17.688 [DEBUG] [TestEventLogger] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 01:35:17.688 [DEBUG] [TestEventLogger] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 01:35:17.689 [DEBUG] [TestEventLogger] at java.lang.reflect.Method.invoke(Method.java:483) 01:35:17.689 [DEBUG] [TestEventLogger] at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) 01:35:17.689 [DEBUG] [TestEventLogger] at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) 01:35:17.689 [DEBUG] [TestEventLogger] at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) 01:35:17.689 [DEBUG] [TestEventLogger] at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) 01:35:17.689 [DEBUG] [TestEventLogger] at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) 01:35:17.689 [DEBUG] [TestEventLogger] at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106) 01:35:17.689 [DEBUG] [TestEventLogger] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 01:35:17.690 [DEBUG] [TestEventLogger] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 01:35:17.690 [DEBUG] [TestEventLogger] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 01:35:17.690 [DEBUG] [TestEventLogger] at java.lang.reflect.Method.invoke(Method.java:483) 01:35:17.690 [DEBUG] [TestEventLogger] at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) 01:35:17.690 [DEBUG] [TestEventLogger] at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) 01:35:17.690 [DEBUG] [TestEventLogger] at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360) 01:35:17.690 [DEBUG] [TestEventLogger] at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54) 01:35:17.690 [DEBUG] [TestEventLogger] at org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40) 01:35:17.690 [DEBUG] [TestEventLogger] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 01:35:17.691 [DEBUG] [TestEventLogger] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 01:35:17.691 [DEBUG] [TestEventLogger] at java.lang.Thread.run(Thread.java:745) 01:35:17.691 [DEBUG] [TestEventLogger] 2017-02-15 01:35:17.681 [Test worker] SyncProducer [INFO] Disconnecting from :53693 01:35:17.691 [DEBUG] [TestEventLogger] 01:35:17.691 [DEBUG] [TestEventLogger] org.apache.samza.test.processor.TestStreamProcessor > testStreamProcessor STANDARD_ERROR 01:35:17.691 [DEBUG] [TestEventLogger] 113333 [Test worker] WARN org.apache.samza.system.kafka.KafkaSystemAdmin - Unable to fetch last offsets for streams [numbers] due to kafka.common.KafkaException: fetching topic metadata for topics [Set(numbers)] from broker [ArrayBuffer(BrokerEndPoint(0,,53693))] failed. Retrying.
Turns out that the initialization of the job failed during the fetchMetadata for the input topics. It keeps re-trying due to failed connection to the broker. At that moment, the broker can not be connected to.
Note that this only happens in Mac and when running the test via gradle:
./gradlew clean :samza-test:build -Dtest.single=TestStreamProcessor --debug
Attachments
Issue Links
- is depended upon by
-
SAMZA-1190 Flaky unit tests
- Open