Description
We have seen cases where the intermediate topic created by partitionBy() operator was created with partition number calculated by the planner. However, when input topic partition expands with traffic, usually that would have trickling effect to the downstream topic partition and task assignments (i.e. 128 partitions from the input topic got shuffled into only 8 partitions in the intermediate topic usually does not distribute the work load widely enough). At least, the planner needs to validation and print a warning message if the intermediate topic partition is less than the max of input/output topic partition number.