/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.internals.MemberState;
import org.apache.kafka.clients.consumer.internals.MemberStateListener;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.Utils;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager;
import org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;

public class StreamsMembershipManager
implements RequestManager {
    static final Utils.TopicPartitionComparator TOPIC_PARTITION_COMPARATOR = new Utils.TopicPartitionComparator();
    private final Logger log;
    private final BackgroundEventHandler backgroundEventHandler;
    private final StreamsRebalanceData streamsRebalanceData;
    private final SubscriptionState subscriptionState;
    private MemberState state;
    private final String groupId;
    private final String memberId = Uuid.randomUuid().toString();
    private final Optional<String> groupInstanceId = Optional.empty();
    private int memberEpoch = 0;
    private Optional<CompletableFuture<Void>> leaveGroupInProgress = Optional.empty();
    private CompletableFuture<Void> staleMemberAssignmentRelease;
    private boolean reconciliationInProgress;
    private boolean rejoinedWhileReconciliationInProgress;
    private final List<MemberStateListener> stateUpdatesListeners = new ArrayList<MemberStateListener>();
    private LocalAssignment targetAssignment = LocalAssignment.NONE;
    private LocalAssignment currentAssignment = LocalAssignment.NONE;
    private final AtomicBoolean subscriptionUpdated = new AtomicBoolean(false);
    private final RebalanceMetricsManager metricsManager;
    private final Time time;
    private boolean isPollTimerExpired;

    public StreamsMembershipManager(String groupId, StreamsRebalanceData streamsRebalanceData, SubscriptionState subscriptionState, BackgroundEventHandler backgroundEventHandler, LogContext logContext, Time time, Metrics metrics) {
        this.log = logContext.logger(StreamsMembershipManager.class);
        this.state = MemberState.UNSUBSCRIBED;
        this.groupId = groupId;
        this.backgroundEventHandler = backgroundEventHandler;
        this.streamsRebalanceData = streamsRebalanceData;
        this.subscriptionState = subscriptionState;
        this.metricsManager = new ConsumerRebalanceMetricsManager(metrics, subscriptionState);
        this.time = time;
    }

    public String groupId() {
        return this.groupId;
    }

    public String memberId() {
        return this.memberId;
    }

    public Optional<String> groupInstanceId() {
        return this.groupInstanceId;
    }

    public int memberEpoch() {
        return this.memberEpoch;
    }

    public MemberState state() {
        return this.state;
    }

    public boolean isLeavingGroup() {
        return this.state == MemberState.PREPARE_LEAVING || this.state == MemberState.LEAVING;
    }

    private boolean isNotInGroup() {
        return this.state == MemberState.UNSUBSCRIBED || this.state == MemberState.FENCED || this.state == MemberState.FATAL || this.state == MemberState.STALE;
    }

    public void registerStateListener(MemberStateListener listener) {
        Objects.requireNonNull(listener, "State updates listener cannot be null");
        for (MemberStateListener registeredListener : this.stateUpdatesListeners) {
            if (registeredListener != listener) continue;
            throw new IllegalArgumentException("Listener is already registered.");
        }
        this.stateUpdatesListeners.add(listener);
    }

    private void notifyEpochChange(Optional<Integer> epoch) {
        this.stateUpdatesListeners.forEach(stateListener -> stateListener.onMemberEpochUpdated(epoch, this.memberId));
    }

    void notifyAssignmentChange(Set<TopicPartition> partitions) {
        this.stateUpdatesListeners.forEach(stateListener -> stateListener.onGroupAssignmentUpdated(partitions));
    }

    private void transitionToJoining() {
        if (this.state == MemberState.FATAL) {
            this.log.warn("No action taken to join the group with the updated subscription because the member is in FATAL state");
            return;
        }
        if (this.reconciliationInProgress) {
            this.rejoinedWhileReconciliationInProgress = true;
        }
        this.resetEpoch();
        this.transitionTo(MemberState.JOINING);
        this.clearCurrentTaskAssignment();
    }

    private void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) {
        if (this.state == MemberState.FATAL) {
            this.log.warn("Member {} with epoch {} won't send leave group request because it is in FATAL state", (Object)this.memberId, (Object)this.memberEpoch);
            return;
        }
        if (this.state == MemberState.UNSUBSCRIBED) {
            this.log.warn("Member {} won't send leave group request because it is already out of the group.", (Object)this.memberId);
            return;
        }
        if (dueToExpiredPollTimer) {
            this.isPollTimerExpired = true;
            this.transitionTo(MemberState.PREPARE_LEAVING);
        }
        this.finalizeLeaving();
        this.transitionTo(MemberState.LEAVING);
    }

    private void finalizeLeaving() {
        this.updateMemberEpoch(-1);
        this.clearCurrentTaskAssignment();
    }

    private void transitionToStale() {
        this.transitionTo(MemberState.STALE);
        CompletableFuture<Void> onAllTasksLostCallbackExecuted = this.requestOnAllTasksLostCallbackInvocation();
        this.staleMemberAssignmentRelease = onAllTasksLostCallbackExecuted.whenComplete((result, error) -> {
            if (error != null) {
                this.log.error("Task revocation callback invocation failed after member left group due to expired poll timer.", error);
            }
            this.clearTaskAndPartitionAssignment();
            this.log.debug("Member {} sent leave group heartbeat and released its assignment. It will remain in {} state until the poll timer is reset, and it will then rejoin the group", (Object)this.memberId, (Object)MemberState.STALE);
        });
    }

    public void transitionToFatal() {
        MemberState previousState = this.state;
        this.transitionTo(MemberState.FATAL);
        this.log.error("Member {} with epoch {} transitioned to fatal state", (Object)this.memberId, (Object)this.memberEpoch);
        this.notifyEpochChange(Optional.empty());
        if (previousState == MemberState.UNSUBSCRIBED) {
            this.log.debug("Member {} with epoch {} got fatal error from the broker but it already left the group, so onAllTasksLost callback won't be triggered.", (Object)this.memberId, (Object)this.memberEpoch);
            return;
        }
        if (previousState == MemberState.LEAVING || previousState == MemberState.PREPARE_LEAVING) {
            this.log.info("Member {} with epoch {} was leaving the group with state {} when it got a fatal error from the broker. It will discard the ongoing leave and remain in fatal state.", new Object[]{this.memberId, this.memberEpoch, previousState});
            this.maybeCompleteLeaveInProgress();
            return;
        }
        CompletableFuture<Void> onAllTasksLostCallbackExecuted = this.requestOnAllTasksLostCallbackInvocation();
        onAllTasksLostCallbackExecuted.whenComplete((result, error) -> {
            if (error != null) {
                this.log.error("onAllTasksLost callback invocation failed while releasing assignment after member failed with fatal error.", error);
            }
            this.clearTaskAndPartitionAssignment();
        });
    }

    public void onHeartbeatRequestSkipped() {
        if (this.state == MemberState.LEAVING) {
            this.log.warn("Heartbeat to leave group cannot be sent (most probably due to coordinator not known/available). Member {} with epoch {} will transition to {}.", new Object[]{this.memberId, this.memberEpoch, MemberState.UNSUBSCRIBED});
            this.transitionTo(MemberState.UNSUBSCRIBED);
            this.maybeCompleteLeaveInProgress();
        }
    }

    private void transitionTo(MemberState nextState) {
        if (!this.state.equals((Object)nextState) && !nextState.getPreviousValidStates().contains((Object)this.state)) {
            throw new IllegalStateException(String.format("Invalid state transition from %s to %s", new Object[]{this.state, nextState}));
        }
        if (StreamsMembershipManager.isCompletingRebalance(this.state, nextState)) {
            this.metricsManager.recordRebalanceEnded(this.time.milliseconds());
        }
        if (StreamsMembershipManager.isStartingRebalance(this.state, nextState)) {
            this.metricsManager.recordRebalanceStarted(this.time.milliseconds());
        }
        this.log.info("Member {} with epoch {} transitioned from {} to {}.", new Object[]{this.memberId, this.memberEpoch, this.state, nextState});
        this.state = nextState;
    }

    private static boolean isCompletingRebalance(MemberState currentState, MemberState nextState) {
        return currentState == MemberState.RECONCILING && (nextState == MemberState.STABLE || nextState == MemberState.ACKNOWLEDGING);
    }

    private static boolean isStartingRebalance(MemberState currentState, MemberState nextState) {
        return currentState != MemberState.RECONCILING && nextState == MemberState.RECONCILING;
    }

    private void resetEpoch() {
        this.updateMemberEpoch(0);
    }

    private void updateMemberEpoch(int newEpoch) {
        boolean newEpochReceived = this.memberEpoch != newEpoch;
        this.memberEpoch = newEpoch;
        if (newEpochReceived) {
            if (this.memberEpoch > 0) {
                this.notifyEpochChange(Optional.of(this.memberEpoch));
            } else {
                this.notifyEpochChange(Optional.empty());
            }
        }
    }

    private void clearCurrentTaskAssignment() {
        this.currentAssignment = LocalAssignment.NONE;
    }

    private void clearTaskAndPartitionAssignment() {
        this.subscriptionState.assignFromSubscribed(Collections.emptySet());
        this.notifyAssignmentChange(Collections.emptySet());
        this.currentAssignment = LocalAssignment.NONE;
        this.targetAssignment = LocalAssignment.NONE;
    }

    public boolean shouldSkipHeartbeat() {
        return this.isNotInGroup();
    }

    public boolean shouldNotWaitForHeartbeatInterval() {
        return this.state == MemberState.ACKNOWLEDGING || this.state == MemberState.LEAVING || this.state == MemberState.JOINING;
    }

    public void onSubscriptionUpdated() {
        this.subscriptionUpdated.compareAndSet(false, true);
    }

    public void onConsumerPoll() {
        if (this.subscriptionUpdated.compareAndSet(true, false) && this.state == MemberState.UNSUBSCRIBED) {
            this.transitionToJoining();
        }
    }

    public void onHeartbeatRequestGenerated() {
        if (this.state == MemberState.ACKNOWLEDGING) {
            if (this.targetAssignmentReconciled()) {
                this.transitionTo(MemberState.STABLE);
            } else {
                this.log.debug("Member {} with epoch {} transitioned to {} after a heartbeat was sent to ack a previous reconciliation. New assignments are ready to be reconciled.", new Object[]{this.memberId, this.memberEpoch, MemberState.RECONCILING});
                this.transitionTo(MemberState.RECONCILING);
            }
        } else if (this.state == MemberState.LEAVING) {
            if (this.isPollTimerExpired) {
                this.log.debug("Member {} with epoch {} generated the heartbeat to leave due to expired poll timer. It will remain stale (no heartbeat) until it rejoins the group on the next consumer poll.", (Object)this.memberId, (Object)this.memberEpoch);
                this.transitionToStale();
            } else {
                this.log.debug("Member {} with epoch {} generated the heartbeat to leave the group.", (Object)this.memberId, (Object)this.memberEpoch);
                this.transitionTo(MemberState.UNSUBSCRIBED);
            }
        }
    }

    public void onHeartbeatSuccess(StreamsGroupHeartbeatResponse response) {
        StreamsGroupHeartbeatResponseData responseData = response.data();
        this.throwIfUnexpectedError(responseData);
        if (this.state == MemberState.LEAVING) {
            this.log.debug("Ignoring heartbeat response received from broker. Member {} with epoch {} is already leaving the group.", (Object)this.memberId, (Object)this.memberEpoch);
            return;
        }
        if (this.state == MemberState.UNSUBSCRIBED && responseData.memberEpoch() < 0 && this.maybeCompleteLeaveInProgress()) {
            this.log.debug("Member {} with epoch {} received a successful response to the heartbeat to leave the group and completed the leave operation. ", (Object)this.memberId, (Object)this.memberEpoch);
            return;
        }
        if (this.isNotInGroup()) {
            this.log.debug("Ignoring heartbeat response received from broker. Member {} is in {} state so it's not a member of the group. ", (Object)this.memberId, (Object)this.state);
            return;
        }
        if (responseData.memberEpoch() < 0) {
            this.log.debug("Ignoring heartbeat response received from broker. Member {} with epoch {} is in {} state and the member epoch is invalid: {}. ", new Object[]{this.memberId, this.memberEpoch, this.state, responseData.memberEpoch()});
            this.maybeCompleteLeaveInProgress();
            return;
        }
        this.updateMemberEpoch(responseData.memberEpoch());
        List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks = responseData.activeTasks();
        List<StreamsGroupHeartbeatResponseData.TaskIds> standbyTasks = responseData.standbyTasks();
        List<StreamsGroupHeartbeatResponseData.TaskIds> warmupTasks = responseData.warmupTasks();
        boolean isGroupReady = this.isGroupReady(responseData.status());
        if (activeTasks != null && standbyTasks != null && warmupTasks != null) {
            if (!this.state.canHandleNewAssignment()) {
                this.log.debug("Ignoring new assignment: active tasks {}, standby tasks {}, and warm-up tasks {} received from server because member is in {} state.", new Object[]{activeTasks, standbyTasks, warmupTasks, this.state});
                return;
            }
            this.processAssignmentReceived(StreamsMembershipManager.toTasksAssignment(activeTasks), StreamsMembershipManager.toTasksAssignment(standbyTasks), StreamsMembershipManager.toTasksAssignment(warmupTasks), isGroupReady);
        } else {
            if (responseData.activeTasks() != null || responseData.standbyTasks() != null || responseData.warmupTasks() != null) {
                throw new IllegalStateException("Invalid response data, task collections must be all null or all non-null: " + String.valueOf(responseData));
            }
            if (isGroupReady != this.targetAssignment.isGroupReady) {
                this.processAssignmentReceived(this.targetAssignment.activeTasks, this.targetAssignment.standbyTasks, this.targetAssignment.warmupTasks, isGroupReady);
            }
        }
    }

    private boolean isGroupReady(List<StreamsGroupHeartbeatResponseData.Status> statuses) {
        if (statuses != null) {
            for (StreamsGroupHeartbeatResponseData.Status status : statuses) {
                switch (StreamsGroupHeartbeatResponse.Status.fromCode(status.statusCode())) {
                    case MISSING_SOURCE_TOPICS: 
                    case MISSING_INTERNAL_TOPICS: 
                    case INCORRECTLY_PARTITIONED_TOPICS: 
                    case ASSIGNMENT_DELAYED: {
                        return false;
                    }
                }
            }
        }
        return true;
    }

    public void onRetriableHeartbeatFailure() {
        this.onHeartbeatFailure();
    }

    public void onFatalHeartbeatFailure() {
        this.metricsManager.maybeRecordRebalanceFailed();
        this.onHeartbeatFailure();
    }

    private void onHeartbeatFailure() {
        if (this.state == MemberState.UNSUBSCRIBED && this.maybeCompleteLeaveInProgress()) {
            this.log.warn("Member {} with epoch {} received a failed response to the heartbeat to leave the group and completed the leave operation. ", (Object)this.memberId, (Object)this.memberEpoch);
        }
    }

    public void onPollTimerExpired() {
        this.transitionToSendingLeaveGroup(true);
    }

    public void onFenced() {
        if (this.state == MemberState.PREPARE_LEAVING) {
            this.log.debug("Member {} with epoch {} got fenced but it is already preparing to leave the group, so it will stop sending heartbeat and won't attempt to send the leave request or rejoin.", (Object)this.memberId, (Object)this.memberEpoch);
            this.finalizeLeaving();
            this.transitionTo(MemberState.UNSUBSCRIBED);
            this.maybeCompleteLeaveInProgress();
            return;
        }
        if (this.state == MemberState.LEAVING) {
            this.log.debug("Member {} with epoch {} got fenced before sending leave group heartbeat. It will not send the leave request and won't attempt to rejoin.", (Object)this.memberId, (Object)this.memberEpoch);
            this.transitionTo(MemberState.UNSUBSCRIBED);
            this.maybeCompleteLeaveInProgress();
            return;
        }
        if (this.state == MemberState.UNSUBSCRIBED) {
            this.log.debug("Member {} with epoch {} got fenced but it already left the group, so it won't attempt to rejoin.", (Object)this.memberId, (Object)this.memberEpoch);
            return;
        }
        this.transitionTo(MemberState.FENCED);
        this.resetEpoch();
        this.log.debug("Member {} with epoch {} transitioned to {} state. It will release its assignment and rejoin the group.", new Object[]{this.memberId, this.memberEpoch, MemberState.FENCED});
        CompletableFuture<Void> onAllTasksLostCallbackExecuted = this.requestOnAllTasksLostCallbackInvocation();
        onAllTasksLostCallbackExecuted.whenComplete((result, error) -> {
            if (error != null) {
                this.log.error("onAllTasksLost callback invocation failed while releasing assignment after member got fenced. Member will rejoin the group anyways.", error);
            }
            this.clearTaskAndPartitionAssignment();
            if (this.state == MemberState.FENCED) {
                this.transitionToJoining();
            } else {
                this.log.debug("Fenced member onAllTasksLost callback completed but the state has already changed to {}, so the member won't rejoin the group", (Object)this.state);
            }
        });
    }

    private void throwIfUnexpectedError(StreamsGroupHeartbeatResponseData responseData) {
        if (responseData.errorCode() != Errors.NONE.code()) {
            String errorMessage = String.format("Unexpected error in Heartbeat response. Expected no error, but received: %s with message: '%s'", new Object[]{Errors.forCode(responseData.errorCode()), responseData.errorMessage()});
            throw new IllegalArgumentException(errorMessage);
        }
    }

    public void maybeRejoinStaleMember() {
        this.isPollTimerExpired = false;
        if (this.state == MemberState.STALE) {
            this.log.debug("Expired poll timer has been reset so stale member {} will rejoin the group when it completes releasing its previous assignment.", (Object)this.memberId);
            this.staleMemberAssignmentRelease.whenComplete((__, error) -> this.transitionToJoining());
        }
    }

    private boolean maybeCompleteLeaveInProgress() {
        if (this.leaveGroupInProgress.isPresent()) {
            this.leaveGroupInProgress.get().complete(null);
            this.leaveGroupInProgress = Optional.empty();
            return true;
        }
        return false;
    }

    private static SortedSet<StreamsRebalanceData.TaskId> toTaskIdSet(Map<String, SortedSet<Integer>> tasks) {
        TreeSet<StreamsRebalanceData.TaskId> taskIdSet = new TreeSet<StreamsRebalanceData.TaskId>();
        for (Map.Entry<String, SortedSet<Integer>> task : tasks.entrySet()) {
            String subtopologyId = task.getKey();
            SortedSet<Integer> partitions = task.getValue();
            Iterator iterator = partitions.iterator();
            while (iterator.hasNext()) {
                int partition = (Integer)iterator.next();
                taskIdSet.add(new StreamsRebalanceData.TaskId(subtopologyId, partition));
            }
        }
        return taskIdSet;
    }

    private static Map<String, SortedSet<Integer>> toTasksAssignment(List<StreamsGroupHeartbeatResponseData.TaskIds> taskIds) {
        return taskIds.stream().collect(Collectors.toMap(StreamsGroupHeartbeatResponseData.TaskIds::subtopologyId, taskId -> new TreeSet<Integer>(taskId.partitions())));
    }

    public CompletableFuture<Void> leaveGroupOnClose() {
        return this.leaveGroup(true);
    }

    public CompletableFuture<Void> leaveGroup() {
        return this.leaveGroup(false);
    }

    private CompletableFuture<Void> leaveGroup(boolean isOnClose) {
        if (this.isNotInGroup()) {
            if (this.state == MemberState.FENCED) {
                this.clearTaskAndPartitionAssignment();
                this.transitionTo(MemberState.UNSUBSCRIBED);
            }
            this.subscriptionState.unsubscribe();
            this.notifyAssignmentChange(Collections.emptySet());
            return CompletableFuture.completedFuture(null);
        }
        if (this.state == MemberState.PREPARE_LEAVING || this.state == MemberState.LEAVING) {
            this.log.debug("Leave group operation already in progress for member {}", (Object)this.memberId);
            return this.leaveGroupInProgress.get();
        }
        this.transitionTo(MemberState.PREPARE_LEAVING);
        CompletableFuture<Void> onGroupLeft = new CompletableFuture<Void>();
        this.leaveGroupInProgress = Optional.of(onGroupLeft);
        if (isOnClose) {
            this.leaving();
        } else {
            CompletableFuture<Void> onAllActiveTasksReleasedCallbackExecuted = this.releaseActiveTasks();
            onAllActiveTasksReleasedCallbackExecuted.whenComplete((__, callbackError) -> this.leavingAfterReleasingActiveTasks((Throwable)callbackError));
        }
        return onGroupLeft;
    }

    private CompletableFuture<Void> releaseActiveTasks() {
        if (this.memberEpoch > 0) {
            return this.revokeActiveTasks(StreamsMembershipManager.toTaskIdSet(this.currentAssignment.activeTasks));
        }
        return this.releaseLostActiveTasks();
    }

    private void leavingAfterReleasingActiveTasks(Throwable callbackError) {
        if (callbackError != null) {
            this.log.error("Member {} callback to revoke task assignment failed. It will proceed to clear its assignment and send a leave group heartbeat", (Object)this.memberId, (Object)callbackError);
        } else {
            this.log.info("Member {} completed callback to revoke task assignment. It will proceed to clear its assignment and send a leave group heartbeat", (Object)this.memberId);
        }
        this.leaving();
    }

    private void leaving() {
        this.clearTaskAndPartitionAssignment();
        this.subscriptionState.unsubscribe();
        this.transitionToSendingLeaveGroup(false);
    }

    private void processAssignmentReceived(Map<String, SortedSet<Integer>> activeTasks, Map<String, SortedSet<Integer>> standbyTasks, Map<String, SortedSet<Integer>> warmupTasks, boolean isGroupReady) {
        this.replaceTargetAssignmentWithNewAssignment(activeTasks, standbyTasks, warmupTasks, isGroupReady);
        if (!this.targetAssignmentReconciled()) {
            this.transitionTo(MemberState.RECONCILING);
        } else {
            this.log.debug("Target assignment {} received from the broker is equals to the member current assignment {}. Nothing to reconcile.", (Object)this.targetAssignment, (Object)this.currentAssignment);
            if (this.state == MemberState.RECONCILING || this.state == MemberState.JOINING) {
                this.transitionTo(MemberState.STABLE);
            }
        }
    }

    private boolean targetAssignmentReconciled() {
        return this.currentAssignment.equals(this.targetAssignment);
    }

    private void replaceTargetAssignmentWithNewAssignment(Map<String, SortedSet<Integer>> activeTasks, Map<String, SortedSet<Integer>> standbyTasks, Map<String, SortedSet<Integer>> warmupTasks, boolean isGroupReady) {
        this.targetAssignment.updateWith(activeTasks, standbyTasks, warmupTasks, isGroupReady).ifPresent(updatedAssignment -> {
            this.log.debug("Target assignment updated from {} to {}. Member will reconcile it on the next poll.", (Object)this.targetAssignment, updatedAssignment);
            this.targetAssignment = updatedAssignment;
        });
    }

    @Override
    public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
        if (this.state == MemberState.RECONCILING) {
            this.maybeReconcile();
        }
        return NetworkClientDelegate.PollResult.EMPTY;
    }

    private void maybeReconcile() {
        if (this.targetAssignmentReconciled()) {
            this.log.trace("Ignoring reconciliation attempt. Target assignment is equal to the current assignment.");
            return;
        }
        if (this.reconciliationInProgress) {
            this.log.trace("Ignoring reconciliation attempt. Another reconciliation is already in progress. Assignment {} will be handled in the next reconciliation loop.", (Object)this.targetAssignment);
            return;
        }
        this.markReconciliationInProgress();
        SortedSet<StreamsRebalanceData.TaskId> assignedActiveTasks = StreamsMembershipManager.toTaskIdSet(this.targetAssignment.activeTasks);
        SortedSet<StreamsRebalanceData.TaskId> ownedActiveTasks = StreamsMembershipManager.toTaskIdSet(this.currentAssignment.activeTasks);
        TreeSet<StreamsRebalanceData.TaskId> activeTasksToRevoke = new TreeSet<StreamsRebalanceData.TaskId>(ownedActiveTasks);
        activeTasksToRevoke.removeAll(assignedActiveTasks);
        SortedSet<StreamsRebalanceData.TaskId> assignedStandbyTasks = StreamsMembershipManager.toTaskIdSet(this.targetAssignment.standbyTasks);
        SortedSet<StreamsRebalanceData.TaskId> ownedStandbyTasks = StreamsMembershipManager.toTaskIdSet(this.currentAssignment.standbyTasks);
        SortedSet<StreamsRebalanceData.TaskId> assignedWarmupTasks = StreamsMembershipManager.toTaskIdSet(this.targetAssignment.warmupTasks);
        SortedSet<StreamsRebalanceData.TaskId> ownedWarmupTasks = StreamsMembershipManager.toTaskIdSet(this.currentAssignment.warmupTasks);
        boolean isGroupReady = this.targetAssignment.isGroupReady;
        this.log.info("Assigned tasks with local epoch {} and group {}\n\tMember:                        {}\n\tAssigned active tasks:         {}\n\tOwned active tasks:            {}\n\tActive tasks to revoke:        {}\n\tAssigned standby tasks:        {}\n\tOwned standby tasks:           {}\n\tAssigned warm-up tasks:        {}\n\tOwned warm-up tasks:           {}\n", new Object[]{this.targetAssignment.localEpoch, isGroupReady ? "is ready" : "is not ready", this.memberId, assignedActiveTasks, ownedActiveTasks, activeTasksToRevoke, assignedStandbyTasks, ownedStandbyTasks, assignedWarmupTasks, ownedWarmupTasks});
        TreeSet<TopicPartition> ownedTopicPartitionsFromSubscriptionState = new TreeSet<TopicPartition>(TOPIC_PARTITION_COMPARATOR);
        ownedTopicPartitionsFromSubscriptionState.addAll(this.subscriptionState.assignedPartitions());
        SortedSet<TopicPartition> ownedTopicPartitionsFromAssignedTasks = this.topicPartitionsForActiveTasks(this.currentAssignment.activeTasks);
        if (!ownedTopicPartitionsFromAssignedTasks.equals(ownedTopicPartitionsFromSubscriptionState)) {
            throw new IllegalStateException("Owned partitions from subscription state and owned partitions from assigned active tasks are not equal. Owned partitions from subscription state: " + String.valueOf(ownedTopicPartitionsFromSubscriptionState) + ", Owned partitions from assigned active tasks: " + String.valueOf(ownedTopicPartitionsFromAssignedTasks));
        }
        SortedSet<TopicPartition> assignedTopicPartitions = this.topicPartitionsForActiveTasks(this.targetAssignment.activeTasks);
        TreeSet<TopicPartition> partitionsToRevoke = new TreeSet<TopicPartition>((SortedSet<TopicPartition>)ownedTopicPartitionsFromSubscriptionState);
        partitionsToRevoke.removeAll(assignedTopicPartitions);
        CompletableFuture<Void> tasksRevoked = this.revokeActiveTasks(activeTasksToRevoke);
        CompletionStage tasksRevokedAndAssigned = tasksRevoked.thenCompose(__ -> {
            if (!this.maybeAbortReconciliation()) {
                return this.assignTasks(assignedActiveTasks, ownedActiveTasks, assignedStandbyTasks, assignedWarmupTasks, isGroupReady);
            }
            return CompletableFuture.completedFuture(null);
        });
        LocalAssignment currentTargetAssignment = this.targetAssignment;
        ((CompletableFuture)tasksRevokedAndAssigned).whenComplete((__, callbackError) -> {
            if (callbackError != null) {
                this.log.error("Reconciliation failed: callback invocation failed for tasks {}", (Object)currentTargetAssignment, callbackError);
                this.markReconciliationCompleted();
            } else if (this.reconciliationInProgress && !this.maybeAbortReconciliation()) {
                this.currentAssignment = currentTargetAssignment;
                this.transitionTo(MemberState.ACKNOWLEDGING);
                this.markReconciliationCompleted();
            }
        });
    }

    private CompletableFuture<Void> revokeActiveTasks(SortedSet<StreamsRebalanceData.TaskId> activeTasksToRevoke) {
        if (activeTasksToRevoke.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        this.log.info("Revoking previously assigned active tasks {}", (Object)activeTasksToRevoke.stream().map(StreamsRebalanceData.TaskId::toString).collect(Collectors.joining(", ")));
        SortedSet<TopicPartition> partitionsToRevoke = this.topicPartitionsForActiveTasks(activeTasksToRevoke);
        this.log.debug("Marking partitions pending for revocation: {}", partitionsToRevoke);
        this.subscriptionState.markPendingRevocation(partitionsToRevoke);
        CompletableFuture<Void> tasksRevoked = new CompletableFuture<Void>();
        CompletableFuture<Void> onTasksRevokedCallbackExecuted = this.requestOnTasksRevokedCallbackInvocation(activeTasksToRevoke);
        onTasksRevokedCallbackExecuted.whenComplete((__, callbackError) -> {
            if (callbackError != null) {
                this.log.error("onTasksRevoked callback invocation failed for tasks {}", (Object)activeTasksToRevoke, callbackError);
                tasksRevoked.completeExceptionally((Throwable)callbackError);
            } else {
                tasksRevoked.complete(null);
            }
        });
        return tasksRevoked;
    }

    private CompletableFuture<Void> assignTasks(SortedSet<StreamsRebalanceData.TaskId> activeTasksToAssign, SortedSet<StreamsRebalanceData.TaskId> ownedActiveTasks, SortedSet<StreamsRebalanceData.TaskId> standbyTasksToAssign, SortedSet<StreamsRebalanceData.TaskId> warmupTasksToAssign, boolean isGroupReady) {
        this.log.info("Assigning active tasks {{}}, standby tasks {{}}, and warm-up tasks {{}} to the member.", new Object[]{activeTasksToAssign.stream().map(StreamsRebalanceData.TaskId::toString).collect(Collectors.joining(", ")), standbyTasksToAssign.stream().map(StreamsRebalanceData.TaskId::toString).collect(Collectors.joining(", ")), warmupTasksToAssign.stream().map(StreamsRebalanceData.TaskId::toString).collect(Collectors.joining(", "))});
        SortedSet<TopicPartition> partitionsToAssign = this.topicPartitionsForActiveTasks(activeTasksToAssign);
        SortedSet<TopicPartition> partitionsToAssignNotPreviouslyOwned = this.partitionsToAssignNotPreviouslyOwned(partitionsToAssign, this.topicPartitionsForActiveTasks(ownedActiveTasks));
        this.subscriptionState.assignFromSubscribedAwaitingCallback(partitionsToAssign, partitionsToAssignNotPreviouslyOwned);
        this.notifyAssignmentChange(partitionsToAssign);
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.requestOnTasksAssignedCallbackInvocation(new StreamsRebalanceData.Assignment(activeTasksToAssign, standbyTasksToAssign, warmupTasksToAssign, isGroupReady));
        onTasksAssignedCallbackExecuted.whenComplete((__, callbackError) -> {
            if (callbackError == null) {
                this.subscriptionState.enablePartitionsAwaitingCallback(partitionsToAssign);
            } else if (!partitionsToAssignNotPreviouslyOwned.isEmpty()) {
                this.log.warn("Leaving newly assigned partitions {} marked as non-fetchable and not requiring initializing positions after onTasksAssigned callback failed.", (Object)partitionsToAssignNotPreviouslyOwned, callbackError);
            }
        });
        return onTasksAssignedCallbackExecuted;
    }

    private CompletableFuture<Void> releaseLostActiveTasks() {
        SortedSet<StreamsRebalanceData.TaskId> activeTasksToRelease = StreamsMembershipManager.toTaskIdSet(this.currentAssignment.activeTasks);
        this.log.info("Revoking previously assigned and now lost active tasks {}", (Object)activeTasksToRelease.stream().map(StreamsRebalanceData.TaskId::toString).collect(Collectors.joining(", ")));
        SortedSet<TopicPartition> partitionsToRelease = this.topicPartitionsForActiveTasks(activeTasksToRelease);
        this.log.debug("Marking lost partitions pending for revocation: {}", partitionsToRelease);
        this.subscriptionState.markPendingRevocation(partitionsToRelease);
        return this.requestOnAllTasksLostCallbackInvocation();
    }

    private SortedSet<TopicPartition> partitionsToAssignNotPreviouslyOwned(SortedSet<TopicPartition> assignedTopicPartitions, SortedSet<TopicPartition> ownedTopicPartitions) {
        TreeSet<TopicPartition> assignedPartitionsNotPreviouslyOwned = new TreeSet<TopicPartition>(TOPIC_PARTITION_COMPARATOR);
        assignedPartitionsNotPreviouslyOwned.addAll(assignedTopicPartitions);
        assignedPartitionsNotPreviouslyOwned.removeAll(ownedTopicPartitions);
        return assignedPartitionsNotPreviouslyOwned;
    }

    private SortedSet<TopicPartition> topicPartitionsForActiveTasks(Map<String, SortedSet<Integer>> activeTasks) {
        TreeSet<TopicPartition> topicPartitions = new TreeSet<TopicPartition>(TOPIC_PARTITION_COMPARATOR);
        activeTasks.forEach((subtopologyId, partitionIds) -> Stream.concat(this.streamsRebalanceData.subtopologies().get(subtopologyId).sourceTopics().stream(), this.streamsRebalanceData.subtopologies().get(subtopologyId).repartitionSourceTopics().keySet().stream()).forEach(topic -> {
            Iterator iterator = partitionIds.iterator();
            while (iterator.hasNext()) {
                int partitionId = (Integer)iterator.next();
                topicPartitions.add(new TopicPartition((String)topic, partitionId));
            }
        }));
        return topicPartitions;
    }

    private SortedSet<TopicPartition> topicPartitionsForActiveTasks(SortedSet<StreamsRebalanceData.TaskId> activeTasks) {
        TreeSet<TopicPartition> topicPartitions = new TreeSet<TopicPartition>(TOPIC_PARTITION_COMPARATOR);
        activeTasks.forEach(task -> Stream.concat(this.streamsRebalanceData.subtopologies().get(task.subtopologyId()).sourceTopics().stream(), this.streamsRebalanceData.subtopologies().get(task.subtopologyId()).repartitionSourceTopics().keySet().stream()).forEach(topic -> topicPartitions.add(new TopicPartition((String)topic, task.partitionId()))));
        return topicPartitions;
    }

    private void markReconciliationCompleted() {
        this.reconciliationInProgress = false;
        this.rejoinedWhileReconciliationInProgress = false;
    }

    private boolean maybeAbortReconciliation() {
        boolean shouldAbort;
        boolean bl = shouldAbort = this.state != MemberState.RECONCILING || this.rejoinedWhileReconciliationInProgress;
        if (shouldAbort) {
            String reason = this.rejoinedWhileReconciliationInProgress ? "the member has re-joined the group" : "the member already transitioned out of the reconciling state into " + String.valueOf((Object)this.state);
            this.log.info("Interrupting reconciliation that is not relevant anymore because {}", (Object)reason);
            this.markReconciliationCompleted();
        }
        return shouldAbort;
    }

    private void markReconciliationInProgress() {
        this.reconciliationInProgress = true;
        this.rejoinedWhileReconciliationInProgress = false;
    }

    private CompletableFuture<Void> requestOnTasksAssignedCallbackInvocation(StreamsRebalanceData.Assignment assignment) {
        StreamsOnTasksAssignedCallbackNeededEvent onTasksAssignedCallbackNeededEvent = new StreamsOnTasksAssignedCallbackNeededEvent(assignment);
        this.backgroundEventHandler.add(onTasksAssignedCallbackNeededEvent);
        return onTasksAssignedCallbackNeededEvent.future();
    }

    private CompletableFuture<Void> requestOnAllTasksLostCallbackInvocation() {
        StreamsOnAllTasksLostCallbackNeededEvent onAllTasksLostCallbackNeededEvent = new StreamsOnAllTasksLostCallbackNeededEvent();
        this.backgroundEventHandler.add(onAllTasksLostCallbackNeededEvent);
        return onAllTasksLostCallbackNeededEvent.future();
    }

    public CompletableFuture<Void> requestOnTasksRevokedCallbackInvocation(Set<StreamsRebalanceData.TaskId> activeTasksToRevoke) {
        StreamsOnTasksRevokedCallbackNeededEvent onTasksRevokedCallbackNeededEvent = new StreamsOnTasksRevokedCallbackNeededEvent(activeTasksToRevoke);
        this.backgroundEventHandler.add(onTasksRevokedCallbackNeededEvent);
        return onTasksRevokedCallbackNeededEvent.future();
    }

    public void onTasksRevokedCallbackCompleted(StreamsOnTasksRevokedCallbackCompletedEvent event) {
        Optional<KafkaException> error = event.error();
        CompletableFuture<Void> future = event.future();
        if (error.isPresent()) {
            Exception e = error.get();
            this.log.warn("The onTasksRevoked callback completed with an error ({}); signaling to continue to the next phase of rebalance", (Object)e.getMessage());
            future.completeExceptionally(e);
        } else {
            this.log.debug("The onTasksRevoked callback completed successfully; signaling to continue to the next phase of rebalance");
            future.complete(null);
        }
    }

    public void onTasksAssignedCallbackCompleted(StreamsOnTasksAssignedCallbackCompletedEvent event) {
        Optional<KafkaException> error = event.error();
        CompletableFuture<Void> future = event.future();
        if (error.isPresent()) {
            Exception e = error.get();
            this.log.warn("The onTasksAssigned callback completed with an error ({}); signaling to continue to the next phase of rebalance", (Object)e.getMessage());
            future.completeExceptionally(e);
        } else {
            this.log.debug("The onTasksAssigned callback completed successfully; signaling to continue to the next phase of rebalance");
            future.complete(null);
        }
    }

    public void onAllTasksLostCallbackCompleted(StreamsOnAllTasksLostCallbackCompletedEvent event) {
        Optional<KafkaException> error = event.error();
        CompletableFuture<Void> future = event.future();
        if (error.isPresent()) {
            Exception e = error.get();
            this.log.warn("The onAllTasksLost callback completed with an error ({}); signaling to continue to the next phase of rebalance", (Object)e.getMessage());
            future.completeExceptionally(e);
        } else {
            this.log.debug("The onAllTasksLost callback completed successfully; signaling to continue to the next phase of rebalance");
            future.complete(null);
        }
    }

    List<MemberStateListener> stateListeners() {
        return Collections.unmodifiableList(this.stateUpdatesListeners);
    }

    private static class LocalAssignment {
        public static final long NONE_EPOCH = -1L;
        public static final LocalAssignment NONE = new LocalAssignment(-1L, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), false);
        public final long localEpoch;
        public final Map<String, SortedSet<Integer>> activeTasks;
        public final Map<String, SortedSet<Integer>> standbyTasks;
        public final Map<String, SortedSet<Integer>> warmupTasks;
        public final boolean isGroupReady;

        public LocalAssignment(long localEpoch, Map<String, SortedSet<Integer>> activeTasks, Map<String, SortedSet<Integer>> standbyTasks, Map<String, SortedSet<Integer>> warmupTasks, boolean isGroupReady) {
            this.localEpoch = localEpoch;
            this.activeTasks = activeTasks;
            this.standbyTasks = standbyTasks;
            this.warmupTasks = warmupTasks;
            this.isGroupReady = isGroupReady;
            if (!(localEpoch != -1L || activeTasks.isEmpty() && standbyTasks.isEmpty() && warmupTasks.isEmpty())) {
                throw new IllegalArgumentException("Local epoch must be set if tasks are assigned.");
            }
        }

        Optional<LocalAssignment> updateWith(Map<String, SortedSet<Integer>> activeTasks, Map<String, SortedSet<Integer>> standbyTasks, Map<String, SortedSet<Integer>> warmupTasks, boolean isGroupReady) {
            if (this.localEpoch != -1L && activeTasks.equals(this.activeTasks) && standbyTasks.equals(this.standbyTasks) && warmupTasks.equals(this.warmupTasks) && isGroupReady == this.isGroupReady) {
                return Optional.empty();
            }
            long nextLocalEpoch = this.localEpoch + 1L;
            return Optional.of(new LocalAssignment(nextLocalEpoch, activeTasks, standbyTasks, warmupTasks, isGroupReady));
        }

        public String toString() {
            return "LocalAssignment{localEpoch=" + this.localEpoch + ", activeTasks=" + String.valueOf(this.activeTasks) + ", standbyTasks=" + String.valueOf(this.standbyTasks) + ", warmupTasks=" + String.valueOf(this.warmupTasks) + ", isGroupReady=" + this.isGroupReady + "}";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            LocalAssignment that = (LocalAssignment)o;
            return this.localEpoch == that.localEpoch && Objects.equals(this.activeTasks, that.activeTasks) && Objects.equals(this.standbyTasks, that.standbyTasks) && Objects.equals(this.warmupTasks, that.warmupTasks) && this.isGroupReady == that.isGroupReady;
        }

        public int hashCode() {
            return Objects.hash(this.localEpoch, this.activeTasks, this.standbyTasks, this.warmupTasks, this.isGroupReady);
        }
    }
}

