Description
In SystemProducers.send, we currently do this:
producers(envelope.getSystemStream.getSystem).send(source, bytesEnvelope.get)
If a developer calls collector.send() with a system that's not defined in their job's config, they get:
Exception in thread "main" java.util.NoSuchElementException: key not found: undefined-stream at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:65) at org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInstance.scala:170) at org.apache.samza.container.TaskInstance$$anonfun$send$2.apply(TaskInstance.scala:170) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.samza.container.TaskInstance.send(TaskInstance.scala:170) at org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala:116) at org.apache.samza.container.RunLoop$$anonfun$send$2.apply(RunLoop.scala:116) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.RunLoop.send(RunLoop.scala:116) at org.apache.samza.container.RunLoop.run(RunLoop.scala:59) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:504) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:81) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)
NOTE: this stack trace is from an 0.7.0 job, but it's basically the same on master
We should update the SystemProducers.send method to do a .getOrElse, and throw a SamzaException with a more clear warning.