Description
In this case,oldProtocols will always be the protocols,because knownStaticMember is updated before.So, I think oldProtocol should be assigned before updateMember.
private def updateStaticMemberAndRebalance(group: GroupMetadata, newMemberId: String, groupInstanceId: Option[String], protocols: List[(String, Array[Byte])], responseCallback: JoinCallback): Unit = { val oldMemberId = group.getStaticMemberId(groupInstanceId) info(s"Static member $groupInstanceId of group ${group.groupId} with unknown member id rejoins, assigning new member id $newMemberId, while " + s"old member id $oldMemberId will be removed.") val currentLeader = group.leaderOrNull val member = group.replaceGroupInstance(oldMemberId, newMemberId, groupInstanceId) // Heartbeat of old member id will expire without effect since the group no longer contains that member id. // New heartbeat shall be scheduled with new member id. completeAndScheduleNextHeartbeatExpiration(group, member) val knownStaticMember = group.get(newMemberId) group.updateMember(knownStaticMember, protocols, responseCallback) val oldProtocols = knownStaticMember.supportedProtocols group.currentState match { case Stable => // check if group's selectedProtocol of next generation will change, if not, simply store group to persist the // updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent val selectedProtocolOfNextGeneration = group.selectProtocol if (group.protocolName.contains(selectedProtocolOfNextGeneration)) { info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.") val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap groupManager.storeGroup(group, groupAssignment, error => { if (error != Errors.NONE) { warn(s"Failed to persist metadata for group ${group.groupId}: ${error.message}") // Failed to persist member.id of the given static member, revert the update of the static member in the group. group.updateMember(knownStaticMember, oldProtocols, null) val oldMember = group.replaceGroupInstance(newMemberId, oldMemberId, groupInstanceId) completeAndScheduleNextHeartbeatExpiration(group, oldMember) responseCallback(JoinGroupResult( List.empty, memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = currentLeader, error = error )) } else { group.maybeInvokeJoinCallback(member, JoinGroupResult( members = List.empty, memberId = newMemberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, // We want to avoid current leader performing trivial assignment while the group // is in stable stage, because the new assignment in leader's next sync call // won't be broadcast by a stable group. This could be guaranteed by // always returning the old leader id so that the current leader won't assume itself // as a leader based on the returned message, since the new member.id won't match // returned leader id, therefore no assignment will be performed. leaderId = currentLeader, error = Errors.NONE)) } }) } else { maybePrepareRebalance(group, s"Group's selectedProtocol will change because static member ${member.memberId} with instance id $groupInstanceId joined with change of protocol") } case CompletingRebalance => // if the group is in after-sync stage, upon getting a new join-group of a known static member // we should still trigger a new rebalance, since the old member may already be sent to the leader // for assignment, and hence when the assignment gets back there would be a mismatch of the old member id // with the new replaced member id. As a result the new member id would not get any assignment. prepareRebalance(group, s"Updating metadata for static member ${member.memberId} with instance id $groupInstanceId") case Empty | Dead => throw new IllegalStateException(s"Group ${group.groupId} was not supposed to be " + s"in the state ${group.currentState} when the unknown static member $groupInstanceId rejoins.") case PreparingRebalance => } }