Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
0.10.1
-
None
-
None
Description
The GroupByPartitionCount grouper deletes the persisted task mapping if the partition count has changed because there may be fewer tasks and that would cause the old mapping to be invalid.
To delete the mapping, the TaskAssignmentManager registers itself and writes null for all the keys. Later when the recalculated mapping is saved, it tries to reregister itself, which causes this exception:
Exception in thread "main" org.apache.samza.SamzaException: SamzaTaskAssignmentManager is already registered with the queuing system producer
at org.apache.samza.system.kafka.KafkaSystemProducer.register(KafkaSystemProducer.scala:65)
at org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.register(CoordinatorStreamSystemProducer.java:72)
at org.apache.samza.coordinator.stream.AbstractCoordinatorStreamManager.registerCoordinatorStreamProducer(AbstractCoordinatorStreamManager.java:100)
at org.apache.samza.container.grouper.task.TaskAssignmentManager.register(TaskAssignmentManager.java:58)
at org.apache.samza.container.grouper.task.GroupByContainerCount.saveTaskAssignments(GroupByContainerCount.java:179)
at org.apache.samza.container.grouper.task.GroupByContainerCount.balance(GroupByContainerCount.java:93)
at org.apache.samza.coordinator.JobCoordinator$.refreshJobModel(JobCoordinator.scala:255)
at org.apache.samza.coordinator.JobCoordinator$.jobModelGenerator$1(JobCoordinator.scala:187)
at org.apache.samza.coordinator.JobCoordinator$.initializeJobModel(JobCoordinator.scala:193)
at org.apache.samza.coordinator.JobCoordinator$.getJobCoordinator(JobCoordinator.scala:120)
at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:104)
at org.apache.samza.job.yarn.SamzaAppMaster$.main(SamzaAppMaster.scala:74)
at org.apache.samza.job.yarn.SamzaAppMaster.main(SamzaAppMaster.scala)
In a YARN environment, the AM restarts and since the task mapping has now been deleted, this 2nd attempt to save the mapping succeeds.
Since this issue only occurs when the partition count changes and is recoverable, I'm marking it as low priority.