/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metadata.migration;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.image.AclsDelta;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.ClientQuotaImage;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ConfigurationsDelta;
import org.apache.kafka.image.ConfigurationsImage;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.ProducerIdsDelta;
import org.apache.kafka.image.ProducerIdsImage;
import org.apache.kafka.image.ScramImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.ScramCredentialData;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.migration.ConfigMigrationClient;
import org.apache.kafka.metadata.migration.KRaftMigrationOperationConsumer;
import org.apache.kafka.metadata.migration.MigrationClient;
import org.apache.kafka.metadata.migration.TopicMigrationClient;
import org.apache.kafka.server.common.ProducerIdsBlock;

public class KRaftMigrationZkWriter {
    private static final String UPDATE_PRODUCER_ID = "UpdateProducerId";
    private static final String CREATE_TOPIC = "CreateTopic";
    private static final String UPDATE_TOPIC = "UpdateTopic";
    private static final String DELETE_TOPIC = "DeleteTopic";
    private static final String UPDATE_PARTITON = "UpdatePartition";
    private static final String DELETE_PARTITION = "DeletePartition";
    private static final String UPDATE_BROKER_CONFIG = "UpdateBrokerConfig";
    private static final String DELETE_BROKER_CONFIG = "DeleteBrokerConfig";
    private static final String UPDATE_TOPIC_CONFIG = "UpdateTopicConfig";
    private static final String DELETE_TOPIC_CONFIG = "DeleteTopicConfig";
    private static final String UPDATE_CLIENT_QUOTA = "UpdateClientQuota";
    private static final String UPDATE_ACL = "UpdateAcl";
    private static final String DELETE_ACL = "DeleteAcl";
    private final MigrationClient migrationClient;

    public KRaftMigrationZkWriter(MigrationClient migrationClient) {
        this.migrationClient = migrationClient;
    }

    public void handleSnapshot(MetadataImage image, KRaftMigrationOperationConsumer operationConsumer) {
        this.handleTopicsSnapshot(image.topics(), operationConsumer);
        this.handleConfigsSnapshot(image.configs(), operationConsumer);
        this.handleClientQuotasSnapshot(image.clientQuotas(), image.scram(), operationConsumer);
        this.handleProducerIdSnapshot(image.producerIds(), operationConsumer);
        this.handleAclsSnapshot(image.acls(), operationConsumer);
    }

    public void handleDelta(MetadataImage previousImage, MetadataImage image, MetadataDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
        if (delta.topicsDelta() != null) {
            this.handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, image.topics(), delta.topicsDelta(), operationConsumer);
        }
        if (delta.configsDelta() != null) {
            this.handleConfigsDelta(image.configs(), delta.configsDelta(), operationConsumer);
        }
        if (delta.clientQuotasDelta() != null || delta.scramDelta() != null) {
            this.handleClientQuotasDelta(image, delta, operationConsumer);
        }
        if (delta.producerIdsDelta() != null) {
            this.handleProducerIdDelta(delta.producerIdsDelta(), operationConsumer);
        }
        if (delta.aclsDelta() != null) {
            this.handleAclsDelta(image.acls(), delta.aclsDelta(), operationConsumer);
        }
    }

    void handleTopicsSnapshot(final TopicsImage topicsImage, KRaftMigrationOperationConsumer operationConsumer) {
        final HashMap<Uuid, String> deletedTopics = new HashMap<Uuid, String>();
        final HashSet topicsInZk = new HashSet();
        final HashSet<Uuid> newTopics = new HashSet<Uuid>(topicsImage.topicsById().keySet());
        final HashSet changedTopics = new HashSet();
        final HashMap partitionsInZk = new HashMap();
        HashMap<String, Set> extraneousPartitionsInZk = new HashMap<String, Set>();
        final HashMap<Uuid, Map> changedPartitions = new HashMap<Uuid, Map>();
        HashMap<Uuid, Map> newPartitions = new HashMap<Uuid, Map>();
        this.migrationClient.topicClient().iterateTopics(EnumSet.of(TopicMigrationClient.TopicVisitorInterest.TOPICS, TopicMigrationClient.TopicVisitorInterest.PARTITIONS), new TopicMigrationClient.TopicVisitor(){

            @Override
            public void visitTopic(String topicName, Uuid topicId, Map<Integer, List<Integer>> assignments) {
                TopicImage topic = topicsImage.getTopic(topicId);
                if (topic == null) {
                    deletedTopics.put(topicId, topicName);
                } else {
                    if (!newTopics.remove(topicId)) {
                        return;
                    }
                    topicsInZk.add(topicId);
                }
            }

            @Override
            public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistration partitionRegistration) {
                TopicImage topic = topicsImage.getTopic(topicIdPartition.topicId());
                if (topic == null) {
                    return;
                }
                partitionsInZk.computeIfAbsent(topic.id(), __ -> new HashSet()).add(topicIdPartition.partition());
                PartitionRegistration kraftPartition = topic.partitions().get(topicIdPartition.partition());
                if (kraftPartition != null) {
                    if (!kraftPartition.equals(partitionRegistration)) {
                        changedPartitions.computeIfAbsent(topicIdPartition.topicId(), __ -> new HashMap()).put(topicIdPartition.partition(), kraftPartition);
                    }
                    if (!kraftPartition.hasSameAssignment(partitionRegistration)) {
                        changedTopics.add(topic.id());
                    }
                }
            }
        });
        topicsInZk.forEach(topicId -> {
            TopicImage topic = topicsImage.getTopic((Uuid)topicId);
            Set topicPartitionsInZk = partitionsInZk.computeIfAbsent(topicId, __ -> new HashSet());
            if (!topicPartitionsInZk.equals(topic.partitions().keySet())) {
                HashMap<Integer, PartitionRegistration> newTopicPartitions = new HashMap<Integer, PartitionRegistration>(topic.partitions());
                topicPartitionsInZk.forEach(newTopicPartitions::remove);
                newPartitions.put((Uuid)topicId, newTopicPartitions);
                topicPartitionsInZk.removeAll(topic.partitions().keySet());
                if (!topicPartitionsInZk.isEmpty()) {
                    extraneousPartitionsInZk.put(topic.name(), topicPartitionsInZk);
                }
                changedTopics.add(topicId);
            }
        });
        newTopics.forEach(topicId -> {
            TopicImage topic = topicsImage.getTopic((Uuid)topicId);
            operationConsumer.accept(CREATE_TOPIC, "Create Topic " + topic.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().createTopic(topic.name(), (Uuid)topicId, topic.partitions(), migrationState));
        });
        changedTopics.forEach(topicId -> {
            TopicImage topic = topicsImage.getTopic((Uuid)topicId);
            operationConsumer.accept(UPDATE_TOPIC, "Changed Topic " + topic.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().updateTopic(topic.name(), (Uuid)topicId, topic.partitions(), migrationState));
        });
        deletedTopics.forEach((topicId, topicName) -> {
            operationConsumer.accept(DELETE_TOPIC, "Delete Topic " + topicName + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().deleteTopic((String)topicName, migrationState));
            ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
            operationConsumer.accept(UPDATE_TOPIC_CONFIG, "Updating Configs for Topic " + topicName + ", ID " + topicId, migrationState -> this.migrationClient.configClient().deleteConfigs(resource, migrationState));
        });
        newPartitions.forEach((topicId, partitionMap) -> {
            TopicImage topic = topicsImage.getTopic((Uuid)topicId);
            operationConsumer.accept(UPDATE_PARTITON, "Creating additional partitions for Topic " + topic.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().updateTopicPartitions(Collections.singletonMap(topic.name(), partitionMap), migrationState));
        });
        changedPartitions.forEach((topicId, partitionMap) -> {
            TopicImage topic = topicsImage.getTopic((Uuid)topicId);
            operationConsumer.accept(UPDATE_PARTITON, "Updating Partitions for Topic " + topic.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().updateTopicPartitions(Collections.singletonMap(topic.name(), partitionMap), migrationState));
        });
        extraneousPartitionsInZk.forEach((topicName, partitions) -> operationConsumer.accept(DELETE_PARTITION, "Deleting extraneous Partitions " + partitions + " for Topic " + topicName, migrationState -> this.migrationClient.topicClient().deleteTopicPartitions(Collections.singletonMap(topicName, partitions), migrationState)));
    }

    void handleTopicsDelta(Function<Uuid, String> deletedTopicNameResolver, TopicsImage topicsImage, TopicsDelta topicsDelta, KRaftMigrationOperationConsumer operationConsumer) {
        topicsDelta.deletedTopicIds().forEach(topicId -> {
            String name = (String)deletedTopicNameResolver.apply((Uuid)topicId);
            operationConsumer.accept(DELETE_TOPIC, "Deleting topic " + name + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().deleteTopic(name, migrationState));
        });
        topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
            if (topicsDelta.createdTopicIds().contains(topicId)) {
                operationConsumer.accept(CREATE_TOPIC, "Create Topic " + topicDelta.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().createTopic(topicDelta.name(), (Uuid)topicId, topicDelta.partitionChanges(), migrationState));
            } else {
                if (topicDelta.hasPartitionsWithAssignmentChanges()) {
                    operationConsumer.accept(UPDATE_TOPIC, "Updating Topic " + topicDelta.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().updateTopic(topicDelta.name(), (Uuid)topicId, topicsImage.getTopic((Uuid)topicId).partitions(), migrationState));
                }
                HashMap<Integer, PartitionRegistration> newPartitions = new HashMap<Integer, PartitionRegistration>(topicDelta.newPartitions());
                HashMap<Integer, PartitionRegistration> changedPartitions = new HashMap<Integer, PartitionRegistration>(topicDelta.partitionChanges());
                if (!newPartitions.isEmpty()) {
                    operationConsumer.accept(UPDATE_PARTITON, "Create new partitions for Topic " + topicDelta.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().createTopicPartitions(Collections.singletonMap(topicDelta.name(), newPartitions), migrationState));
                    newPartitions.keySet().forEach(changedPartitions::remove);
                }
                if (!changedPartitions.isEmpty()) {
                    HashMap<Integer, PartitionRegistration> finalChangedPartitions = changedPartitions;
                    operationConsumer.accept(UPDATE_PARTITON, "Updating Partitions for Topic " + topicDelta.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().updateTopicPartitions(Collections.singletonMap(topicDelta.name(), finalChangedPartitions), migrationState));
                }
            }
        });
    }

    private String brokerOrTopicOpType(ConfigResource resource, String brokerOp, String topicOp) {
        if (resource.type().equals((Object)ConfigResource.Type.BROKER)) {
            return brokerOp;
        }
        return topicOp;
    }

    void handleConfigsSnapshot(ConfigurationsImage configsImage, KRaftMigrationOperationConsumer operationConsumer) {
        HashSet newResources = new HashSet();
        configsImage.resourceData().keySet().forEach(resource -> {
            if (!EnumSet.of(ConfigResource.Type.BROKER, ConfigResource.Type.TOPIC).contains(resource.type())) {
                throw new RuntimeException("Unknown config resource type " + resource.type());
            }
            newResources.add(resource);
        });
        HashSet resourcesToUpdate = new HashSet();
        BiConsumer<ConfigResource, Map> processConfigsForResource = (resource, configs) -> {
            newResources.remove(resource);
            Map<String, String> kraftProps = configsImage.configMapForResource((ConfigResource)resource);
            if (!kraftProps.equals(configs)) {
                resourcesToUpdate.add(resource);
            }
        };
        this.migrationClient.configClient().iterateBrokerConfigs((broker, configs) -> {
            ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, broker);
            processConfigsForResource.accept(brokerResource, (Map)configs);
        });
        this.migrationClient.configClient().iterateTopicConfigs((topic, configs) -> {
            ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
            processConfigsForResource.accept(topicResource, (Map)configs);
        });
        newResources.forEach(resource -> {
            Map<String, String> props = configsImage.configMapForResource((ConfigResource)resource);
            if (!props.isEmpty()) {
                String opType = this.brokerOrTopicOpType((ConfigResource)resource, UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG);
                operationConsumer.accept(opType, "Create configs for " + resource.type().name() + " " + resource.name(), migrationState -> this.migrationClient.configClient().writeConfigs((ConfigResource)resource, props, migrationState));
            }
        });
        resourcesToUpdate.forEach(resource -> {
            Map<String, String> props = configsImage.configMapForResource((ConfigResource)resource);
            if (props.isEmpty()) {
                String opType = this.brokerOrTopicOpType((ConfigResource)resource, DELETE_BROKER_CONFIG, DELETE_TOPIC_CONFIG);
                operationConsumer.accept(opType, "Delete configs for " + resource.type().name() + " " + resource.name(), migrationState -> this.migrationClient.configClient().deleteConfigs((ConfigResource)resource, migrationState));
            } else {
                String opType = this.brokerOrTopicOpType((ConfigResource)resource, UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG);
                operationConsumer.accept(opType, "Update configs for " + resource.type().name() + " " + resource.name(), migrationState -> this.migrationClient.configClient().writeConfigs((ConfigResource)resource, props, migrationState));
            }
        });
    }

    private Map<String, String> getScramCredentialStringsForUser(ScramImage image, String userName) {
        HashMap<String, String> userScramCredentialStrings = new HashMap<String, String>();
        if (image != null) {
            image.mechanisms().forEach((scramMechanism, scramMechanismMap) -> {
                ScramCredentialData scramCredentialData = (ScramCredentialData)scramMechanismMap.get(userName);
                if (scramCredentialData != null) {
                    userScramCredentialStrings.put(scramMechanism.mechanismName(), ScramCredentialUtils.credentialToString((ScramCredential)scramCredentialData.toCredential((ScramMechanism)scramMechanism)));
                }
            });
        }
        return userScramCredentialStrings;
    }

    void handleClientQuotasSnapshot(final ClientQuotasImage clientQuotasImage, final ScramImage scramImage, KRaftMigrationOperationConsumer opConsumer) {
        final HashSet<ClientQuotaEntity> changedNonUserEntities = new HashSet<ClientQuotaEntity>();
        final HashSet<String> changedUsers = new HashSet<String>();
        if (clientQuotasImage != null) {
            for (Map.Entry<Object, Object> entry : clientQuotasImage.entities().entrySet()) {
                ClientQuotaEntity entity2 = (ClientQuotaEntity)entry.getKey();
                if (entity2.entries().containsKey("user") && !entity2.entries().containsKey("client-id")) {
                    changedUsers.add((String)entity2.entries().get("user"));
                    continue;
                }
                changedNonUserEntities.add(entity2);
            }
        }
        if (scramImage != null) {
            for (Map.Entry<Object, Object> entry : scramImage.mechanisms().entrySet()) {
                for (Map.Entry userEntry : ((Map)entry.getValue()).entrySet()) {
                    changedUsers.add((String)userEntry.getKey());
                }
            }
        }
        this.migrationClient.configClient().iterateClientQuotas(new ConfigMigrationClient.ClientQuotaVisitor(){

            @Override
            public void visitClientQuota(List<ClientQuotaRecord.EntityData> entityDataList, Map<String, Double> quotas) {
                HashMap entityMap = new HashMap(2);
                entityDataList.forEach(entityData -> entityMap.put(entityData.entityType(), entityData.entityName()));
                ClientQuotaEntity entity = new ClientQuotaEntity(entityMap);
                if (!clientQuotasImage.entities().getOrDefault(entity, ClientQuotaImage.EMPTY).quotaMap().equals(quotas)) {
                    if (entity.entries().containsKey("user") && !entity.entries().containsKey("client-id")) {
                        changedUsers.add((String)entityMap.get("user"));
                    } else {
                        changedNonUserEntities.add(entity);
                    }
                }
            }

            @Override
            public void visitScramCredential(String userName, ScramMechanism scramMechanism, ScramCredential scramCredential) {
                ScramCredentialData data = (ScramCredentialData)scramImage.mechanisms().getOrDefault(scramMechanism, Collections.emptyMap()).get(userName);
                if (data == null || !data.toCredential(scramMechanism).equals(scramCredential)) {
                    changedUsers.add(userName);
                }
            }
        });
        changedNonUserEntities.forEach(entity -> {
            Map<String, Double> quotaMap = clientQuotasImage.entities().getOrDefault(entity, ClientQuotaImage.EMPTY).quotaMap();
            opConsumer.accept(UPDATE_CLIENT_QUOTA, "Update client quotas for " + entity, migrationState -> this.migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, Collections.emptyMap(), migrationState));
        });
        changedUsers.forEach(userName -> {
            ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap("user", userName));
            Map<String, Double> quotaMap = clientQuotasImage.entities().getOrDefault(entity, ClientQuotaImage.EMPTY).quotaMap();
            Map<String, String> scramMap = this.getScramCredentialStringsForUser(scramImage, (String)userName);
            opConsumer.accept(UPDATE_CLIENT_QUOTA, "Update client quotas for " + userName, migrationState -> this.migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, scramMap, migrationState));
        });
    }

    void handleProducerIdSnapshot(ProducerIdsImage image, KRaftMigrationOperationConsumer operationConsumer) {
        if (image.isEmpty()) {
            return;
        }
        Optional<ProducerIdsBlock> zkProducerId = this.migrationClient.readProducerId();
        if (zkProducerId.isPresent()) {
            if (zkProducerId.get().nextBlockFirstId() != image.nextProducerId()) {
                operationConsumer.accept(UPDATE_PRODUCER_ID, "Setting next producer ID", migrationState -> this.migrationClient.writeProducerId(image.nextProducerId(), migrationState));
            }
        } else {
            operationConsumer.accept(UPDATE_PRODUCER_ID, "Setting next producer ID", migrationState -> this.migrationClient.writeProducerId(image.nextProducerId(), migrationState));
        }
    }

    void handleConfigsDelta(ConfigurationsImage configsImage, ConfigurationsDelta configsDelta, KRaftMigrationOperationConsumer operationConsumer) {
        Set<ConfigResource> updatedResources = configsDelta.changes().keySet();
        updatedResources.forEach(configResource -> {
            Map<String, String> props = configsImage.configMapForResource((ConfigResource)configResource);
            if (props.isEmpty()) {
                operationConsumer.accept("DeleteConfig", "Delete configs for " + configResource, migrationState -> this.migrationClient.configClient().deleteConfigs((ConfigResource)configResource, migrationState));
            } else {
                operationConsumer.accept("UpdateConfig", "Update configs for " + configResource, migrationState -> this.migrationClient.configClient().writeConfigs((ConfigResource)configResource, props, migrationState));
            }
        });
    }

    void handleClientQuotasDelta(MetadataImage metadataImage, MetadataDelta metadataDelta, KRaftMigrationOperationConsumer operationConsumer) {
        if (metadataDelta.clientQuotasDelta() != null || metadataDelta.scramDelta() != null) {
            HashSet users = new HashSet();
            if (metadataDelta.scramDelta() != null) {
                metadataDelta.scramDelta().changes().forEach((scramMechanism, changes) -> changes.forEach((userName, changeOpt) -> users.add(userName)));
            }
            if (metadataDelta.clientQuotasDelta() != null) {
                metadataDelta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
                    if (clientQuotaEntity.entries().containsKey("user") && !clientQuotaEntity.entries().containsKey("client-id")) {
                        String userName = (String)clientQuotaEntity.entries().get("user");
                        users.add(userName);
                    } else {
                        Map<String, Double> quotaMap = metadataImage.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
                        operationConsumer.accept(UPDATE_CLIENT_QUOTA, "Updating client quota " + clientQuotaEntity, migrationState -> this.migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), quotaMap, Collections.emptyMap(), migrationState));
                    }
                });
            }
            users.forEach(userName -> {
                Map<String, String> userScramMap = this.getScramCredentialStringsForUser(metadataImage.scram(), (String)userName);
                ClientQuotaEntity clientQuotaEntity = new ClientQuotaEntity(Collections.singletonMap("user", userName));
                if (metadataImage.clientQuotas() == null || metadataImage.clientQuotas().entities().get(clientQuotaEntity) == null) {
                    operationConsumer.accept(UPDATE_CLIENT_QUOTA, "Updating scram credentials for " + clientQuotaEntity, migrationState -> this.migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), Collections.emptyMap(), userScramMap, migrationState));
                } else {
                    Map<String, Double> quotaMap = metadataImage.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
                    operationConsumer.accept(UPDATE_CLIENT_QUOTA, "Updating client quota for " + clientQuotaEntity, migrationState -> this.migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState));
                }
            });
        }
    }

    void handleProducerIdDelta(ProducerIdsDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
        operationConsumer.accept(UPDATE_PRODUCER_ID, "Setting next producer ID", migrationState -> this.migrationClient.writeProducerId(delta.nextProducerId(), migrationState));
    }

    private ResourcePattern resourcePatternFromAcl(StandardAcl acl) {
        return new ResourcePattern(acl.resourceType(), acl.resourceName(), acl.patternType());
    }

    void handleAclsSnapshot(AclsImage image, KRaftMigrationOperationConsumer operationConsumer) {
        HashMap allAclsInSnapshot = new HashMap();
        image.acls().values().forEach(standardAcl -> {
            ResourcePattern resourcePattern = this.resourcePatternFromAcl((StandardAcl)standardAcl);
            allAclsInSnapshot.computeIfAbsent(resourcePattern, __ -> new HashSet()).add(new AccessControlEntry(standardAcl.principal(), standardAcl.host(), standardAcl.operation(), standardAcl.permissionType()));
        });
        HashSet newResources = new HashSet(allAclsInSnapshot.keySet());
        HashSet resourcesToDelete = new HashSet();
        HashMap<ResourcePattern, Set> changedResources = new HashMap<ResourcePattern, Set>();
        this.migrationClient.aclClient().iterateAcls((resourcePattern, accessControlEntries) -> {
            newResources.remove(resourcePattern);
            if (!allAclsInSnapshot.containsKey(resourcePattern)) {
                resourcesToDelete.add(resourcePattern);
            } else {
                Set snapshotEntries = (Set)allAclsInSnapshot.get(resourcePattern);
                if (!snapshotEntries.equals(accessControlEntries)) {
                    changedResources.put((ResourcePattern)resourcePattern, snapshotEntries);
                }
            }
        });
        newResources.forEach(resourcePattern -> {
            Set accessControlEntries = (Set)allAclsInSnapshot.get(resourcePattern);
            String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
            operationConsumer.accept(UPDATE_ACL, name, migrationState -> this.migrationClient.aclClient().writeResourceAcls((ResourcePattern)resourcePattern, accessControlEntries, migrationState));
        });
        resourcesToDelete.forEach(deletedResource -> {
            String name = "Deleting resource " + deletedResource + " which has no ACLs in snapshot";
            operationConsumer.accept(DELETE_ACL, name, migrationState -> this.migrationClient.aclClient().deleteResource((ResourcePattern)deletedResource, migrationState));
        });
        changedResources.forEach((resourcePattern, accessControlEntries) -> {
            String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
            operationConsumer.accept(UPDATE_ACL, name, migrationState -> this.migrationClient.aclClient().writeResourceAcls((ResourcePattern)resourcePattern, (Collection<AccessControlEntry>)accessControlEntries, migrationState));
        });
    }

    void handleAclsDelta(AclsImage image, AclsDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
        Set resourcesWithChangedAcls = delta.changes().values().stream().filter(Optional::isPresent).map(Optional::get).map(this::resourcePatternFromAcl).collect(Collectors.toSet());
        Set<ResourcePattern> resourcesWithDeletedAcls = delta.deleted().stream().map(this::resourcePatternFromAcl).collect(Collectors.toSet());
        HashMap<ResourcePattern, List> aclsToWrite = new HashMap<ResourcePattern, List>();
        image.acls().forEach((uuid, standardAcl) -> {
            ResourcePattern resourcePattern = this.resourcePatternFromAcl((StandardAcl)standardAcl);
            boolean removed = resourcesWithDeletedAcls.remove(resourcePattern);
            if (resourcesWithChangedAcls.contains(resourcePattern) || removed) {
                aclsToWrite.computeIfAbsent(resourcePattern, __ -> new ArrayList()).add(new AccessControlEntry(standardAcl.principal(), standardAcl.host(), standardAcl.operation(), standardAcl.permissionType()));
            }
        });
        resourcesWithDeletedAcls.forEach(deletedResource -> {
            String name = "Deleting resource " + deletedResource + " which has no more ACLs";
            operationConsumer.accept(DELETE_ACL, name, migrationState -> this.migrationClient.aclClient().deleteResource((ResourcePattern)deletedResource, migrationState));
        });
        aclsToWrite.forEach((resourcePattern, accessControlEntries) -> {
            String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
            operationConsumer.accept(UPDATE_ACL, name, migrationState -> this.migrationClient.aclClient().writeResourceAcls((ResourcePattern)resourcePattern, (Collection<AccessControlEntry>)accessControlEntries, migrationState));
        });
    }
}

