Description
@Test public void testPollWithAllBootstrapServersDown() throws Exception { ExecutorService executor = Executors.newSingleThreadExecutor(); try { final long pollTimeout = 1000; final AtomicBoolean pollComplete = new AtomicBoolean(); executor.submit(new Runnable() { @Override public void run() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092"); try (KafkaConsumer<byte[], byte[]> consumer = newConsumer(props)) { consumer.subscribe(Arrays.asList(topic)); try { consumer.poll(pollTimeout); } catch (Exception ex) { ex.printStackTrace(); } finally { pollComplete.set(true); } } } }); Thread.sleep(pollTimeout * 2); Assert.assertTrue("poll timeout not work when all servers down", pollComplete.get()); } finally { executor.shutdown(); } }
Attachments
Issue Links
- duplicates
-
KAFKA-5697 StreamThread.shutdown() need to interrupt the stream threads to break the loop
- Resolved
- is related to
-
KAFKA-5065 AbstractCoordinator.ensureCoordinatorReady() stuck in loop if absent any bootstrap servers
- Resolved
- links to
- mentioned in
-
Page Loading...