Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9299

Over eager optimization

    XMLWordPrintableJSON

Details

    • Task
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • None
    • None
    • streams

    Description

      There are a few cases where the optimizer will attempt an optimization that can cause a copartitioning failure. Known case of this are related to join and cogroup, however could also effect merge or others. 

      Take for example three input topics A, B and C  with 2, 3 and 4 partitions respectively.

      B' = B.map();

      B'.join(A)

      B'.join(C)

       

      the optimizer will push up the repartition upstream and with will cause the copartitioning to fail.

      Can be seen with the following test:

      @Test
      public void shouldInsertRepartitionsTopicForCogroupsUsedTwice()

      { final StreamsBuilder builder = new StreamsBuilder(); final Properties properties = new Properties(); properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); final KStream<String, String> stream1 = builder.stream("one", stringConsumed); final KGroupedStream<String, String> groupedOne = stream1.map((k, v) -> new KeyValue<>(v, k)).groupByKey(Grouped.as("foo")); final CogroupedKStream<String, String> one = groupedOne.cogroup(STRING_AGGREGATOR); one.aggregate(STRING_INITIALIZER); one.aggregate(STRING_INITIALIZER); final String topologyDescription = builder.build(properties).describe().toString(); System.err.println(topologyDescription); }

      Topologies:
      Sub-topology: 0
      Source: KSTREAM-SOURCE-0000000000 (topics: [one])
      --> KSTREAM-MAP-0000000001
      Processor: KSTREAM-MAP-0000000001 (stores: [])
      --> foo-repartition-filter
      <-- KSTREAM-SOURCE-0000000000
      Processor: foo-repartition-filter (stores: [])
      --> foo-repartition-sink
      <-- KSTREAM-MAP-0000000001
      Sink: foo-repartition-sink (topic: foo-repartition)
      <-- foo-repartition-filter

      Sub-topology: 1
      Source: foo-repartition-source (topics: [foo-repartition])
      --> COGROUPKSTREAM-AGGREGATE-0000000006, COGROUPKSTREAM-AGGREGATE-0000000012
      Processor: COGROUPKSTREAM-AGGREGATE-0000000006 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000002])
      --> COGROUPKSTREAM-MERGE-0000000007
      <-- foo-repartition-source
      Processor: COGROUPKSTREAM-AGGREGATE-0000000012 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000008])
      --> COGROUPKSTREAM-MERGE-0000000013
      <-- foo-repartition-source
      Processor: COGROUPKSTREAM-MERGE-0000000007 (stores: [])
      --> none
      <-- COGROUPKSTREAM-AGGREGATE-0000000006
      Processor: COGROUPKSTREAM-MERGE-0000000013 (stores: [])
      --> none
      <-- COGROUPKSTREAM-AGGREGATE-0000000012

      Attachments

        Activity

          People

            bbejeck Bill Bejeck
            wcarlson@confluent.io Walker Carlson
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: