Description
While updating Kafka Streams to stop using the deprecated Consumer.poll(long), I found that this code unexpectedly throws an exception:
consumer.subscribe(topics); // consumer.poll(0); <- I've removed this line, which shouldn't be necessary here. final Set<TopicPartition> partitions = new HashSet<>(); for (final String topic : topics) { for (final PartitionInfo partition : consumer.partitionsFor(topic)) { partitions.add(new TopicPartition(partition.topic(), partition.partition())); } } for (final TopicPartition tp : partitions) { final long offset = consumer.position(tp); committedOffsets.put(tp, offset); }
Here is the exception:
Exception in thread "main" java.lang.IllegalStateException: You can only check the position for partitions assigned to this consumer. at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1620) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1586) at org.apache.kafka.streams.tests.EosTestDriver.getCommittedOffsets(EosTestDriver.java:275) at org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:148) at org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:69)
As you can see in the commented code in my snippet, we used to block for assignment with a poll(0), which is now deprecated.
It seems reasonable to me for position() to do the same thing that poll() does, which is call `coordinator.poll(timeout.toMillis())` early in processing to ensure an up-to-date assignment.
Attachments
Issue Links
- blocks
-
KAFKA-5697 StreamThread.shutdown() need to interrupt the stream threads to break the loop
- Resolved
- links to