Details
Description
There are a few cases where the optimizer will attempt an optimization that can cause a copartitioning failure. Known case of this are related to join and cogroup, however could also effect merge or others.
Take for example three input topics A, B and C with 2, 3 and 4 partitions respectively.
B' = B.map();
B'.join(A)
B'.join(C)
the optimizer will push up the repartition upstream and with will cause the copartitioning to fail.
Can be seen with the following test:
@Test
public void shouldInsertRepartitionsTopicForCogroupsUsedTwice()
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [one])
--> KSTREAM-MAP-0000000001
Processor: KSTREAM-MAP-0000000001 (stores: [])
--> foo-repartition-filter
<-- KSTREAM-SOURCE-0000000000
Processor: foo-repartition-filter (stores: [])
--> foo-repartition-sink
<-- KSTREAM-MAP-0000000001
Sink: foo-repartition-sink (topic: foo-repartition)
<-- foo-repartition-filter
Sub-topology: 1
Source: foo-repartition-source (topics: [foo-repartition])
--> COGROUPKSTREAM-AGGREGATE-0000000006, COGROUPKSTREAM-AGGREGATE-0000000012
Processor: COGROUPKSTREAM-AGGREGATE-0000000006 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000002])
--> COGROUPKSTREAM-MERGE-0000000007
<-- foo-repartition-source
Processor: COGROUPKSTREAM-AGGREGATE-0000000012 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000008])
--> COGROUPKSTREAM-MERGE-0000000013
<-- foo-repartition-source
Processor: COGROUPKSTREAM-MERGE-0000000007 (stores: [])
--> none
<-- COGROUPKSTREAM-AGGREGATE-0000000006
Processor: COGROUPKSTREAM-MERGE-0000000013 (stores: [])
--> none
<-- COGROUPKSTREAM-AGGREGATE-0000000012