Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
2.0.0, 2.1.0, 2.2.0, 2.3.0
Description
TL;DR; KafkaConsumer cannot rejoin to the group due to inconsistent AbstractCoordinator.generation (which is NO_GENERATION and AbstractCoordinator.joinFuture (which is succeeded RequestFuture). See explanation below.
There are 16 consumers in single process (threads from pool-4-thread-1 to pool-4-thread-16). All of them belong to single consumer group hercules.sink.elastic.legacy_logs_elk_c2. Rebalancing has been acquired and consumers have got CommitFailedException as expected:
2019-03-10T03:16:37.023Z [pool-4-thread-10] WARN r.k.vostok.hercules.sink.SimpleSink - Commit failed due to rebalancing org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1334) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1298) at ru.kontur.vostok.hercules.sink.Sink.commit(Sink.java:156) at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:104) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
After that, most of them successfully rejoined to the group with generation 10699:
2019-03-10T03:16:39.208Z [pool-4-thread-13] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-13, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group with generation 10699 2019-03-10T03:16:39.209Z [pool-4-thread-13] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-13, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned partitions [legacy_logs_elk_c2-18] ... 2019-03-10T03:16:39.216Z [pool-4-thread-11] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-11, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group with generation 10699 2019-03-10T03:16:39.217Z [pool-4-thread-11] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-11, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned partitions [legacy_logs_elk_c2-10, legacy_logs_elk_c2-11] ... 2019-03-10T03:16:39.218Z [pool-4-thread-15] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-15, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned partitions [legacy_logs_elk_c2-24] 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | hercules.sink.elastic.legacy_logs_elk_c2] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-6, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed since group is rebalancing 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | hercules.sink.elastic.legacy_logs_elk_c2] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed since group is rebalancing 2019-03-10T03:16:42.323Z [kafka-coordinator-heartbeat-thread | hercules.sink.elastic.legacy_logs_elk_c2] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-7, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed since group is rebalancing 2019-03-10T03:17:13.235Z [pool-4-thread-4] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group with generation -1
But one consumer (pool-4-thread-4) got strange generation -1 (see last log record from above).
Further log records in attached log file.
Finally, 15 consumers successfully rejoined. But consumer with thread pool-4-thread-4 didn't rejoin:
2019-03-10T03:17:13.355Z [pool-4-thread-4] ERROR r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152) at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-03-10T03:17:13.360Z [pool-4-thread-4] ERROR r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152) at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)}}
It is important to note, that KafkaConsumer.coordinator.joinFuture is not null and succeeded, but ConsumerCoordinator cannot perform resetJoinGroupFuture() due to exception was thrown from onJoinComplete():
if (future.succeeded()) { // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried. ByteBuffer memberAssignment = future.value().duplicate(); onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment); // We reset the join group future only after the completion callback returns. This ensures // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded. resetJoinGroupFuture(); needsJoinPrepare = true; }
If I understood correctly, the generation was changed to NO_GENERATION in another thread by one of CoordinatorResponseHandlers.
Attachments
Attachments
Issue Links
- incorporates
-
KAFKA-7263 Container exception java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null
- Resolved
-
KAFKA-8891 invalid assignment protocol java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null
- Resolved
- relates to
-
KAFKA-9140 Consumer gets stuck rejoining the group indefinitely
- Resolved
- links to