/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownTopologyException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopologyMetadata {
    private Logger log;
    public static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile("");
    private final StreamsConfig config;
    private final StreamsConfigUtils.ProcessingMode processingMode;
    private final TopologyVersion version;
    private final TaskExecutionMetadata taskExecutionMetadata;
    private final Set<String> pausedTopologies;
    private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders;
    private ProcessorTopology globalTopology;
    private final Map<String, StateStore> globalStateStores = new HashMap<String, StateStore>();
    private final Set<String> allInputTopics = new HashSet<String>();
    private final Map<String, Long> threadVersions = new ConcurrentHashMap<String, Long>();

    public TopologyMetadata(InternalTopologyBuilder builder, StreamsConfig config) {
        this.version = new TopologyVersion();
        this.processingMode = StreamsConfigUtils.processingMode(config);
        this.config = config;
        this.log = LoggerFactory.getLogger(this.getClass());
        this.pausedTopologies = ConcurrentHashMap.newKeySet();
        this.builders = new ConcurrentSkipListMap<String, InternalTopologyBuilder>();
        if (builder.hasNamedTopology()) {
            this.builders.put(builder.topologyName(), builder);
        } else {
            this.builders.put(UNNAMED_TOPOLOGY, builder);
        }
        this.taskExecutionMetadata = new TaskExecutionMetadata(this.builders.keySet(), this.pausedTopologies, this.processingMode);
    }

    public TopologyMetadata(ConcurrentNavigableMap<String, InternalTopologyBuilder> builders, StreamsConfig config) {
        this.version = new TopologyVersion();
        this.processingMode = StreamsConfigUtils.processingMode(config);
        this.config = config;
        this.log = LoggerFactory.getLogger(this.getClass());
        this.pausedTopologies = ConcurrentHashMap.newKeySet();
        this.builders = builders;
        if (builders.isEmpty()) {
            this.log.info("Created an empty KafkaStreams app with no topology");
        }
        this.taskExecutionMetadata = new TaskExecutionMetadata(builders.keySet(), this.pausedTopologies, this.processingMode);
    }

    public void setLog(LogContext logContext) {
        this.log = logContext.logger(this.getClass());
    }

    public StreamsConfigUtils.ProcessingMode processingMode() {
        return this.processingMode;
    }

    public long topologyVersion() {
        return this.version.topologyVersion.get();
    }

    private void lock() {
        this.version.topologyLock.lock();
    }

    private void unlock() {
        this.version.topologyLock.unlock();
    }

    public Collection<String> sourceTopicsForTopology(String name) {
        return ((InternalTopologyBuilder)this.builders.get(name)).fullSourceTopicNames();
    }

    public boolean needsUpdate(String threadName) {
        return this.threadVersions.get(threadName) < this.topologyVersion();
    }

    public void registerThread(String threadName) {
        this.threadVersions.put(threadName, 0L);
    }

    public void unregisterThread(String threadName) {
        this.threadVersions.remove(threadName);
        this.maybeNotifyTopologyVersionListeners();
    }

    public TaskExecutionMetadata taskExecutionMetadata() {
        return this.taskExecutionMetadata;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void executeTopologyUpdatesAndBumpThreadVersion(Consumer<Set<String>> handleTopologyAdditions, Consumer<Set<String>> handleTopologyRemovals) {
        try {
            this.version.topologyLock.lock();
            long latestTopologyVersion = this.topologyVersion();
            handleTopologyAdditions.accept(this.namedTopologiesView());
            handleTopologyRemovals.accept(this.namedTopologiesView());
            this.threadVersions.put(Thread.currentThread().getName(), latestTopologyVersion);
        }
        finally {
            this.version.topologyLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void maybeNotifyTopologyVersionListeners() {
        try {
            this.lock();
            long minThreadVersion = this.getMinimumThreadVersion();
            ListIterator<TopologyVersionListener> iterator = this.version.activeTopologyUpdateListeners.listIterator();
            while (iterator.hasNext()) {
                TopologyVersionListener topologyVersionListener = (TopologyVersionListener)iterator.next();
                long topologyVersionWaitersVersion = topologyVersionListener.topologyVersion;
                if (minThreadVersion < topologyVersionWaitersVersion) continue;
                topologyVersionListener.future.complete(null);
                iterator.remove();
                this.log.info("All threads are now on topology version {}", (Object)topologyVersionListener.topologyVersion);
            }
        }
        finally {
            this.unlock();
        }
    }

    private long getMinimumThreadVersion() {
        Optional<Long> minVersion = this.threadVersions.values().stream().min(Long::compare);
        return minVersion.orElse(Long.MAX_VALUE);
    }

    public void wakeupThreads() {
        try {
            this.lock();
            this.version.topologyCV.signalAll();
        }
        finally {
            this.unlock();
        }
    }

    public void maybeWaitForNonEmptyTopology(Supplier<StreamThread.State> threadState) {
        if (this.isEmpty() && threadState.get().isAlive()) {
            try {
                this.lock();
                while (this.isEmpty() && threadState.get().isAlive()) {
                    try {
                        this.log.debug("Detected that the topology is currently empty, waiting for something to process");
                        this.version.topologyCV.await();
                    }
                    catch (InterruptedException e) {
                        this.log.error("StreamThread was interrupted while waiting on empty topology", (Throwable)e);
                    }
                }
            }
            finally {
                this.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerAndBuildNewTopology(KafkaFutureImpl<Void> future, InternalTopologyBuilder newTopologyBuilder) {
        try {
            this.lock();
            this.buildAndVerifyTopology(newTopologyBuilder);
            this.log.info("New NamedTopology {} passed validation and will be added, old topology version is {}", (Object)newTopologyBuilder.topologyName(), (Object)this.version.topologyVersion.get());
            this.version.topologyVersion.incrementAndGet();
            this.version.activeTopologyUpdateListeners.add(new TopologyVersionListener(this.topologyVersion(), future));
            this.builders.put(newTopologyBuilder.topologyName(), newTopologyBuilder);
            this.wakeupThreads();
            this.log.info("Added NamedTopology {} and updated topology version to {}", (Object)newTopologyBuilder.topologyName(), (Object)this.version.topologyVersion.get());
        }
        catch (Throwable throwable) {
            this.log.error("Failed to add NamedTopology {}, please retry the operation.", (Object)newTopologyBuilder.topologyName());
            future.completeExceptionally(throwable);
        }
        finally {
            this.unlock();
        }
    }

    public void pauseTopology(String topologyName) {
        this.pausedTopologies.add(topologyName);
    }

    public boolean isPaused(String topologyName) {
        if (topologyName == null) {
            return this.pausedTopologies.contains(UNNAMED_TOPOLOGY);
        }
        return this.pausedTopologies.contains(topologyName);
    }

    public void resumeTopology(String topologyName) {
        this.pausedTopologies.remove(topologyName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public KafkaFuture<Void> unregisterTopology(KafkaFutureImpl<Void> removeTopologyFuture, String topologyName) {
        try {
            this.lock();
            this.log.info("Beginning removal of NamedTopology {}, old topology version is {}", (Object)topologyName, (Object)this.version.topologyVersion.get());
            this.version.topologyVersion.incrementAndGet();
            this.version.activeTopologyUpdateListeners.add(new TopologyVersionListener(this.topologyVersion(), removeTopologyFuture));
            InternalTopologyBuilder removedBuilder = (InternalTopologyBuilder)this.builders.remove(topologyName);
            removedBuilder.fullSourceTopicNames().forEach(this.allInputTopics::remove);
            removedBuilder.allSourcePatternStrings().forEach(this.allInputTopics::remove);
            this.log.info("Finished removing NamedTopology {}, topology version was updated to {}", (Object)topologyName, (Object)this.version.topologyVersion.get());
        }
        catch (Throwable throwable) {
            this.log.error("Failed to remove NamedTopology {}, please retry.", (Object)topologyName);
            removeTopologyFuture.completeExceptionally(throwable);
        }
        finally {
            this.unlock();
        }
        return removeTopologyFuture;
    }

    public TopologyConfig.TaskConfig getTaskConfigFor(TaskId taskId) {
        InternalTopologyBuilder builder = this.lookupBuilderForTask(taskId);
        return builder.topologyConfigs().getTaskConfig();
    }

    public void buildAndRewriteTopology() {
        this.applyToEachBuilder(this::buildAndVerifyTopology);
    }

    private void buildAndVerifyTopology(InternalTopologyBuilder builder) {
        builder.rewriteTopology(this.config);
        builder.buildTopology();
        HashSet<String> allInputTopicsCopy = new HashSet<String>(this.allInputTopics);
        int numInputTopics = allInputTopicsCopy.size();
        List<String> inputTopics = builder.fullSourceTopicNames();
        List<String> inputPatterns = builder.allSourcePatternStrings();
        HashSet<String> newInputTopics = new HashSet<String>(inputTopics);
        newInputTopics.addAll(inputPatterns);
        int numNewInputTopics = newInputTopics.size();
        allInputTopicsCopy.addAll(newInputTopics);
        if (allInputTopicsCopy.size() != numInputTopics + numNewInputTopics) {
            inputTopics.retainAll(allInputTopicsCopy);
            inputPatterns.retainAll(allInputTopicsCopy);
            this.log.error("Tried to add the NamedTopology {} but it had overlap with other input topics {} or patterns {}", new Object[]{builder.topologyName(), inputTopics, inputPatterns});
            throw new TopologyException("Named Topologies may not subscribe to the same input topics or patterns");
        }
        ProcessorTopology globalTopology = builder.buildGlobalStateTopology();
        if (globalTopology != null) {
            if (builder.topologyName() != null) {
                throw new IllegalStateException("Global state stores are not supported with Named Topologies");
            }
            if (this.globalTopology != null) {
                throw new TopologyException("Topology builder had global state, but global topology has already been set");
            }
            this.globalTopology = globalTopology;
            this.globalStateStores.putAll(builder.globalStateStores());
        }
        this.allInputTopics.addAll(newInputTopics);
    }

    public int getNumStreamThreads(StreamsConfig config) {
        int configuredNumStreamThreads = config.getInt("num.stream.threads");
        if (this.hasNamedTopologies()) {
            if (this.hasNoLocalTopology()) {
                this.log.error("Detected a named topology with no input topics, a named topology may not be empty.");
                throw new TopologyException("Topology has no stream threads and no global threads, must subscribe to at least one source topic or pattern.");
            }
        } else if (this.hasNoLocalTopology() && !this.hasGlobalTopology()) {
            this.log.error("Topology with no input topics will create no stream threads and no global thread.");
            throw new TopologyException("Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.");
        }
        if (configuredNumStreamThreads != 0 && this.hasNoLocalTopology()) {
            this.log.info("Overriding number of StreamThreads to zero for global-only topology");
            return 0;
        }
        return configuredNumStreamThreads;
    }

    public boolean hasNamedTopologies() {
        return !this.builders.containsKey(UNNAMED_TOPOLOGY);
    }

    public Set<String> namedTopologiesView() {
        return this.hasNamedTopologies() ? Collections.unmodifiableSet(this.builders.keySet()) : Collections.emptySet();
    }

    public boolean hasGlobalTopology() {
        return this.evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasGlobalStores);
    }

    public boolean hasNoLocalTopology() {
        return this.evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasNoLocalTopology);
    }

    public boolean hasPersistentStores() {
        if (this.hasNamedTopologies()) {
            return true;
        }
        return this.evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasPersistentStores);
    }

    public boolean hasStore(String name) {
        return this.evaluateConditionIsTrueForAnyBuilders(b -> b.hasStore(name));
    }

    public boolean hasOffsetResetOverrides() {
        return this.hasNamedTopologies() || this.evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::hasOffsetResetOverrides);
    }

    public OffsetResetStrategy offsetResetStrategy(String topic) {
        for (InternalTopologyBuilder builder : this.builders.values()) {
            if (!builder.containsTopic(topic)) continue;
            return builder.offsetResetStrategy(topic);
        }
        this.log.warn("Unable to look up offset reset strategy for topic {} as this topic does not appear in the sources of any of the current topologies: {}\n This may be due to natural race condition when removing a topology but it should not persist or appear frequently.", (Object)topic, this.namedTopologiesView());
        return null;
    }

    public Collection<String> fullSourceTopicNamesForTopology(String topologyName) {
        Objects.requireNonNull(topologyName, "topology name must not be null");
        return this.lookupBuilderForNamedTopology(topologyName).fullSourceTopicNames();
    }

    public Collection<String> allFullSourceTopicNames() {
        ArrayList<String> sourceTopics = new ArrayList<String>();
        this.applyToEachBuilder(b -> sourceTopics.addAll(b.fullSourceTopicNames()));
        return sourceTopics;
    }

    Pattern sourceTopicPattern() {
        StringBuilder patternBuilder = new StringBuilder();
        this.applyToEachBuilder(b -> {
            String patternString = b.sourceTopicPatternString();
            if (patternString.length() > 0) {
                patternBuilder.append(patternString).append("|");
            }
        });
        if (patternBuilder.length() > 0) {
            patternBuilder.setLength(patternBuilder.length() - 1);
            return Pattern.compile(patternBuilder.toString());
        }
        return EMPTY_ZERO_LENGTH_PATTERN;
    }

    public boolean usesPatternSubscription() {
        return this.evaluateConditionIsTrueForAnyBuilders(InternalTopologyBuilder::usesPatternSubscription);
    }

    public boolean isEmpty() {
        return this.builders.isEmpty();
    }

    public String topologyDescriptionString() {
        if (this.isEmpty()) {
            return "";
        }
        StringBuilder sb = new StringBuilder();
        this.applyToEachBuilder(b -> sb.append(b.describe().toString()));
        return sb.toString();
    }

    public ProcessorTopology buildSubtopology(TaskId task) {
        InternalTopologyBuilder builder = this.lookupBuilderForTask(task);
        return builder.buildSubtopology(task.subtopology());
    }

    public ProcessorTopology globalTaskTopology() {
        if (this.hasNamedTopologies()) {
            throw new IllegalStateException("Global state stores are not supported with Named Topologies");
        }
        return this.globalTopology;
    }

    public Map<String, StateStore> globalStateStores() {
        return this.globalStateStores;
    }

    public Map<String, List<String>> stateStoreNameToSourceTopicsForTopology(String topologyName) {
        return this.lookupBuilderForNamedTopology(topologyName).stateStoreNameToFullSourceTopicNames();
    }

    public Map<String, List<String>> stateStoreNameToSourceTopics() {
        HashMap<String, List<String>> stateStoreNameToSourceTopics = new HashMap<String, List<String>>();
        this.applyToEachBuilder(b -> stateStoreNameToSourceTopics.putAll(b.stateStoreNameToFullSourceTopicNames()));
        return stateStoreNameToSourceTopics;
    }

    public String getStoreForChangelogTopic(String topicName) {
        for (InternalTopologyBuilder builder : this.builders.values()) {
            String store = builder.getStoreForChangelogTopic(topicName);
            if (store == null) continue;
            return store;
        }
        this.log.warn("Unable to locate any store for topic {}", (Object)topicName);
        return "";
    }

    public Collection<String> sourceTopicsForStore(String storeName, String topologyName) {
        return this.lookupBuilderForNamedTopology(topologyName).sourceTopicsForStore(storeName);
    }

    public static String getTopologyNameOrElseUnnamed(String topologyName) {
        return topologyName == null ? UNNAMED_TOPOLOGY : topologyName;
    }

    public Map<Subtopology, InternalTopologyBuilder.TopicsInfo> subtopologyTopicsInfoMapExcluding(Set<String> topologiesToExclude) {
        HashMap<Subtopology, InternalTopologyBuilder.TopicsInfo> subtopologyTopicsInfo = new HashMap<Subtopology, InternalTopologyBuilder.TopicsInfo>();
        this.applyToEachBuilder(b -> {
            if (!topologiesToExclude.contains(b.topologyName())) {
                subtopologyTopicsInfo.putAll(b.subtopologyToTopicsInfo());
            }
        });
        return subtopologyTopicsInfo;
    }

    public Map<String, Map<Subtopology, InternalTopologyBuilder.TopicsInfo>> topologyToSubtopologyTopicsInfoMap() {
        HashMap<String, Map<Subtopology, InternalTopologyBuilder.TopicsInfo>> topologyToSubtopologyTopicsInfoMap = new HashMap<String, Map<Subtopology, InternalTopologyBuilder.TopicsInfo>>();
        this.applyToEachBuilder(b -> topologyToSubtopologyTopicsInfoMap.put(b.topologyName(), b.subtopologyToTopicsInfo()));
        return topologyToSubtopologyTopicsInfoMap;
    }

    public Map<String, List<String>> nodeToSourceTopics(TaskId task) {
        return this.lookupBuilderForTask(task).nodeToSourceTopics();
    }

    void addSubscribedTopicsFromMetadata(Set<String> topics, String logPrefix) {
        this.applyToEachBuilder(b -> b.addSubscribedTopicsFromMetadata(topics, logPrefix));
    }

    void addSubscribedTopicsFromAssignment(List<TopicPartition> partitions, String logPrefix) {
        this.applyToEachBuilder(b -> b.addSubscribedTopicsFromAssignment(partitions, logPrefix));
    }

    public Collection<Set<String>> copartitionGroups() {
        ArrayList<Set<String>> copartitionGroups = new ArrayList<Set<String>>();
        this.applyToEachBuilder(b -> copartitionGroups.addAll(b.copartitionGroups()));
        return copartitionGroups;
    }

    private InternalTopologyBuilder lookupBuilderForTask(TaskId task) {
        InternalTopologyBuilder builder;
        InternalTopologyBuilder internalTopologyBuilder = builder = task.topologyName() == null ? (InternalTopologyBuilder)this.builders.get(UNNAMED_TOPOLOGY) : (InternalTopologyBuilder)this.builders.get(task.topologyName());
        if (builder == null) {
            throw new UnknownTopologyException("Unable to locate topology builder", task.topologyName());
        }
        return builder;
    }

    public Collection<NamedTopology> getAllNamedTopologies() {
        return this.builders.values().stream().map(InternalTopologyBuilder::namedTopology).collect(Collectors.toSet());
    }

    public InternalTopologyBuilder lookupBuilderForNamedTopology(String topologyName) {
        if (topologyName == null) {
            return (InternalTopologyBuilder)this.builders.get(UNNAMED_TOPOLOGY);
        }
        return (InternalTopologyBuilder)this.builders.get(topologyName);
    }

    private boolean evaluateConditionIsTrueForAnyBuilders(Function<InternalTopologyBuilder, Boolean> condition) {
        for (InternalTopologyBuilder builder : this.builders.values()) {
            if (!condition.apply(builder).booleanValue()) continue;
            return true;
        }
        return false;
    }

    private void applyToEachBuilder(Consumer<InternalTopologyBuilder> function) {
        for (InternalTopologyBuilder builder : this.builders.values()) {
            function.accept(builder);
        }
    }

    public static class TopologyVersion {
        public AtomicLong topologyVersion = new AtomicLong(0L);
        public ReentrantLock topologyLock = new ReentrantLock();
        public Condition topologyCV = this.topologyLock.newCondition();
        public List<TopologyVersionListener> activeTopologyUpdateListeners = new LinkedList<TopologyVersionListener>();
    }

    public static class TopologyVersionListener {
        final long topologyVersion;
        final KafkaFutureImpl<Void> future;

        public TopologyVersionListener(long topologyVersion, KafkaFutureImpl<Void> future) {
            this.topologyVersion = topologyVersion;
            this.future = future;
        }
    }

    public static class Subtopology {
        final int nodeGroupId;
        final String namedTopology;

        public Subtopology(int nodeGroupId, String namedTopology) {
            this.nodeGroupId = nodeGroupId;
            this.namedTopology = namedTopology;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Subtopology that = (Subtopology)o;
            return this.nodeGroupId == that.nodeGroupId && Objects.equals(this.namedTopology, that.namedTopology);
        }

        public int hashCode() {
            return Objects.hash(this.nodeGroupId, this.namedTopology);
        }
    }
}

