Description
When starting a stream job that consumes a topic that does not yet exist, the job dies with the following exception:
Exception in thread "main" java.lang.IllegalArgumentException: No tasks found. Likely due to no input partitions. Can't run a job with no tasks. at org.apache.samza.container.grouper.task.GroupByContainerCount.validateTasks(GroupByContainerCount.java:193) at org.apache.samza.container.grouper.task.GroupByContainerCount.balance(GroupByContainerCount.java:86) at org.apache.samza.coordinator.JobModelManager$.refreshJobModel(JobCoordinator.scala:278) at org.apache.samza.coordinator.JobModelManager$.jobModelGenerator$1(JobCoordinator.scala:211) at org.apache.samza.coordinator.JobModelManager$.initializeJobModel(JobCoordinator.scala:217) at org.apache.samza.coordinator.JobModelManager$.getJobCoordinator(JobCoordinator.scala:122) at org.apache.samza.coordinator.JobModelManager$.apply(JobCoordinator.scala:106) at org.apache.samza.coordinator.JobModelManager$.apply(JobCoordinator.scala:112) at org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:40) at org.apache.samza.job.JobRunner.run(JobRunner.scala:129) at org.apache.samza.job.JobRunner$.main(JobRunner.scala:66) at org.apache.samza.job.JobRunner.main(JobRunner.scala)
This seems to be caused by the fix for SAMZA-971, specifically passing partitionsMetadataOnly = true to the StreamMetadataCache in JobModelManager#getInputStreamPartitions
https://github.com/apache/samza/commit/920f803a2e3dab809f4d7bb518259b0f4164407f
Note the input topic is subsequently created, so a restart of the job will likely succeed. Being able to consume topics which have not yet been created is nice to avoid imposing a startup order between the jobs and the processes which produce to the topics. Setting partitionsMetadataOnly to back to false fixed the issue for us, but I'm not sure if this is the best fix. For the record, we are using Kafka 0.10.0.0