Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17066

New consumer updateFetchPositions should perform all operations in background thread

    XMLWordPrintableJSON

Details

    Description

      The updateFetchPositions func in the new consumer performs several actions based on the assigned partitions from the subscriptionState. The way it's currently implemented, it fetches committed offsets for partitions that required a position (retrieved from subscription state in the app thread), and then resets positions for the partitions still needing one (retrieved from the subscription state but in the backgroud thread).
      This is problematic, given that the assignment/subscriptionState may change in the background thread at any time (ex. new partitions reconciled), so we could end up resetting positions to the partition offsets for a partition for which we never evetn attempted to retrieve committed offsets.  

      This sequence for a consumer that owns a partitions tp0,:

      • consumer owns tp0
      • app thread -> updateFetchPositions triggers 
        initWithCommittedOffsetsIfNeeded to retrieve committed offsets for assigned partitions requiring a position (taking them from subscriptions.initializingPartitions()). This will fetch committed offsets for tp0 only.
      • background thread -> receives new partition tp1 and completes reconciliation (adds it to the subscription state as INITIALIZING, requires a position)
      • app thread -> updateFetchPositions resets positions for all partitions that still don't have a valid position after initWithCommittedOffsetsIfNeeded (taking them from subscriptionState.partitionsNeedingReset). This will mistakenly consider that it should reset tp1 to the partition offsets, when in reality it never even tried fetching the committed offsets for it because it wasn't assigned when initWithCommittedOffsetsIfNeeded happened. 

      We should consider moving the updateFetchPositions as a single event to the background, that would safely use the subscriptionState object and apply all actions involved in the updateFetchPositions to the same consistent set of partitions assigned at that moment. 

      Attachments

        Issue Links

          Activity

            People

              lianetm Lianet Magrans
              lianetm Lianet Magrans
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: