/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.ConstrainedPrioritySet;
import org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils;
import org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ClientTagAwareStandbyTaskAssignor
implements StandbyTaskAssignor {
    private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);

    ClientTagAwareStandbyTaskAssignor() {
    }

    @Override
    public boolean assign(Map<UUID, ClientState> clients, Set<TaskId> allTaskIds, Set<TaskId> statefulTaskIds, AssignorConfiguration.AssignmentConfigs configs) {
        int numStandbyReplicas = configs.numStandbyReplicas;
        HashSet<String> rackAwareAssignmentTags = new HashSet<String>(configs.rackAwareAssignmentTags);
        Map<TaskId, Integer> tasksToRemainingStandbys = StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys(numStandbyReplicas, statefulTaskIds);
        HashMap<String, Set<String>> tagKeyToValues = new HashMap<String, Set<String>>();
        HashMap<TagEntry, Set<UUID>> tagEntryToClients = new HashMap<TagEntry, Set<UUID>>();
        ClientTagAwareStandbyTaskAssignor.fillClientsTagStatistics(clients, tagEntryToClients, tagKeyToValues);
        ConstrainedPrioritySet standbyTaskClientsByTaskLoad = StandbyTaskAssignmentUtils.createLeastLoadedPrioritySetConstrainedByAssignedTask(clients);
        HashMap<TaskId, UUID> pendingStandbyTasksToClientId = new HashMap<TaskId, UUID>();
        for (TaskId statefulTaskId : statefulTaskIds) {
            for (Map.Entry<UUID, ClientState> entry : clients.entrySet()) {
                UUID clientId = entry.getKey();
                ClientState clientState = entry.getValue();
                if (!clientState.activeTasks().contains(statefulTaskId)) continue;
                ClientTagAwareStandbyTaskAssignor.assignStandbyTasksToClientsWithDifferentTags(numStandbyReplicas, standbyTaskClientsByTaskLoad, statefulTaskId, clientId, rackAwareAssignmentTags, clients, tasksToRemainingStandbys, tagKeyToValues, tagEntryToClients, pendingStandbyTasksToClientId);
            }
        }
        if (!tasksToRemainingStandbys.isEmpty()) {
            ClientTagAwareStandbyTaskAssignor.assignPendingStandbyTasksToLeastLoadedClients(clients, numStandbyReplicas, standbyTaskClientsByTaskLoad, tasksToRemainingStandbys);
        }
        return false;
    }

    private static void assignPendingStandbyTasksToLeastLoadedClients(Map<UUID, ClientState> clients, int numStandbyReplicas, ConstrainedPrioritySet standbyTaskClientsByTaskLoad, Map<TaskId, Integer> pendingStandbyTaskToNumberRemainingStandbys) {
        standbyTaskClientsByTaskLoad.offerAll(clients.keySet());
        for (Map.Entry<TaskId, Integer> pendingStandbyTaskAssignmentEntry : pendingStandbyTaskToNumberRemainingStandbys.entrySet()) {
            TaskId activeTaskId = pendingStandbyTaskAssignmentEntry.getKey();
            StandbyTaskAssignmentUtils.pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(numStandbyReplicas, clients, pendingStandbyTaskToNumberRemainingStandbys, standbyTaskClientsByTaskLoad, activeTaskId, log);
        }
    }

    @Override
    public boolean isAllowedTaskMovement(ClientState source, ClientState destination) {
        Map<String, String> sourceClientTags = source.clientTags();
        Map<String, String> destinationClientTags = destination.clientTags();
        for (Map.Entry<String, String> sourceClientTagEntry : sourceClientTags.entrySet()) {
            if (sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey()))) continue;
            return false;
        }
        return true;
    }

    static void fillClientsTagStatistics(Map<UUID, ClientState> clientStates, Map<TagEntry, Set<UUID>> tagEntryToClients, Map<String, Set<String>> tagKeyToValues) {
        for (Map.Entry<UUID, ClientState> clientStateEntry : clientStates.entrySet()) {
            UUID clientId = clientStateEntry.getKey();
            ClientState clientState = clientStateEntry.getValue();
            clientState.clientTags().forEach((tagKey, tagValue) -> {
                tagKeyToValues.computeIfAbsent((String)tagKey, ignored -> new HashSet()).add(tagValue);
                tagEntryToClients.computeIfAbsent(new TagEntry((String)tagKey, (String)tagValue), ignored -> new HashSet()).add(clientId);
            });
        }
    }

    static void assignStandbyTasksToClientsWithDifferentTags(int numberOfStandbyClients, ConstrainedPrioritySet standbyTaskClientsByTaskLoad, TaskId activeTaskId, UUID activeTaskClient, Set<String> rackAwareAssignmentTags, Map<UUID, ClientState> clientStates, Map<TaskId, Integer> tasksToRemainingStandbys, Map<String, Set<String>> tagKeyToValues, Map<TagEntry, Set<UUID>> tagEntryToClients, Map<TaskId, UUID> pendingStandbyTasksToClientId) {
        standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet());
        int countOfUsedClients = 1;
        int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId);
        HashMap<TagEntry, Set<UUID>> tagEntryToUsedClients = new HashMap<TagEntry, Set<UUID>>();
        UUID lastUsedClient = activeTaskClient;
        do {
            ClientTagAwareStandbyTaskAssignor.updateClientsOnAlreadyUsedTagEntries(lastUsedClient, countOfUsedClients, rackAwareAssignmentTags, clientStates, tagEntryToClients, tagKeyToValues, tagEntryToUsedClients);
            UUID clientOnUnusedTagDimensions = standbyTaskClientsByTaskLoad.poll(activeTaskId, uuid -> !ClientTagAwareStandbyTaskAssignor.isClientUsedOnAnyOfTheTagEntries(uuid, tagEntryToUsedClients));
            if (clientOnUnusedTagDimensions == null) break;
            ClientState clientStateOnUsedTagDimensions = clientStates.get(clientOnUnusedTagDimensions);
            ++countOfUsedClients;
            log.debug("Assigning {} out of {} standby tasks for an active task [{}] with client tags {}. Standby task client tags are {}.", new Object[]{numberOfStandbyClients - --numRemainingStandbys, numberOfStandbyClients, activeTaskId, clientStates.get(activeTaskClient).clientTags(), clientStateOnUsedTagDimensions.clientTags()});
            clientStateOnUsedTagDimensions.assignStandby(activeTaskId);
            lastUsedClient = clientOnUnusedTagDimensions;
        } while (numRemainingStandbys > 0);
        if (numRemainingStandbys > 0) {
            pendingStandbyTasksToClientId.put(activeTaskId, activeTaskClient);
            tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
            log.warn("Rack aware standby task assignment was not able to assign {} of {} standby tasks for the active task [{}] with the rack aware assignment tags {}. This may happen when there aren't enough application instances on different tag dimensions compared to an active and corresponding standby task. Consider launching application instances on different tag dimensions than [{}]. Standby task assignment will fall back to assigning standby tasks to the least loaded clients.", new Object[]{numRemainingStandbys, numberOfStandbyClients, activeTaskId, rackAwareAssignmentTags, clientStates.get(activeTaskClient).clientTags()});
        } else {
            tasksToRemainingStandbys.remove(activeTaskId);
        }
    }

    private static boolean isClientUsedOnAnyOfTheTagEntries(UUID client, Map<TagEntry, Set<UUID>> tagEntryToUsedClients) {
        return tagEntryToUsedClients.values().stream().anyMatch(usedClients -> usedClients.contains(client));
    }

    private static void updateClientsOnAlreadyUsedTagEntries(UUID usedClient, int countOfUsedClients, Set<String> rackAwareAssignmentTags, Map<UUID, ClientState> clientStates, Map<TagEntry, Set<UUID>> tagEntryToClients, Map<String, Set<String>> tagKeyToValues, Map<TagEntry, Set<UUID>> tagEntryToUsedClients) {
        Map<String, String> usedClientTags = clientStates.get(usedClient).clientTags();
        for (Map.Entry<String, String> usedClientTagEntry : usedClientTags.entrySet()) {
            String tagKey = usedClientTagEntry.getKey();
            if (!rackAwareAssignmentTags.contains(tagKey)) {
                log.warn("Client tag with key [{}] will be ignored when computing rack aware standby task assignment because it is not part of the configured rack awareness [{}].", (Object)tagKey, rackAwareAssignmentTags);
                continue;
            }
            Set<String> allTagValues = tagKeyToValues.get(tagKey);
            if (allTagValues.size() <= countOfUsedClients) {
                allTagValues.forEach(tagValue -> tagEntryToUsedClients.remove(new TagEntry(tagKey, (String)tagValue)));
                continue;
            }
            String tagValue2 = usedClientTagEntry.getValue();
            TagEntry tagEntry = new TagEntry(tagKey, tagValue2);
            Set<UUID> clientsOnUsedTagValue = tagEntryToClients.get(tagEntry);
            tagEntryToUsedClients.put(tagEntry, clientsOnUsedTagValue);
        }
    }

    static final class TagEntry {
        private final String tagKey;
        private final String tagValue;

        TagEntry(String tagKey, String tagValue) {
            this.tagKey = tagKey;
            this.tagValue = tagValue;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TagEntry that = (TagEntry)o;
            return Objects.equals(this.tagKey, that.tagKey) && Objects.equals(this.tagValue, that.tagValue);
        }

        public int hashCode() {
            return Objects.hash(this.tagKey, this.tagValue);
        }
    }
}

