/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.util.RegexMatcher;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.server.util.ToolsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerPerformance {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerPerformance.class);
    private static final Random RND = new Random();

    public static void main(String[] args) {
        try {
            long currentTimeMs;
            LOG.info("Starting consumer...");
            ConsumerPerfOptions options = new ConsumerPerfOptions(args);
            AtomicLong totalMessagesRead = new AtomicLong(0L);
            AtomicLong totalBytesRead = new AtomicLong(0L);
            AtomicLong joinTimeMs = new AtomicLong(0L);
            AtomicLong joinTimeMsInSingleRound = new AtomicLong(0L);
            if (!options.hideHeader()) {
                ConsumerPerformance.printHeader(options.showDetailedStats());
            }
            KafkaConsumer consumer = new KafkaConsumer(options.props());
            long bytesRead = 0L;
            long messagesRead = 0L;
            long lastBytesRead = 0L;
            long lastMessagesRead = 0L;
            long joinStartMs = currentTimeMs = System.currentTimeMillis();
            long startMs = currentTimeMs;
            ConsumerPerformance.consume((KafkaConsumer<byte[], byte[]>)consumer, options, totalMessagesRead, totalBytesRead, joinTimeMs, bytesRead, messagesRead, lastBytesRead, lastMessagesRead, joinStartMs, joinTimeMsInSingleRound);
            long endMs = System.currentTimeMillis();
            Map metrics = null;
            if (options.printMetrics()) {
                metrics = consumer.metrics();
            }
            consumer.close();
            double elapsedSec = (double)(endMs - startMs) / 1000.0;
            long fetchTimeInMs = endMs - startMs - joinTimeMs.get();
            if (!options.showDetailedStats()) {
                double totalMbRead = (double)totalBytesRead.get() * 1.0 / 1048576.0;
                System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, %.4f%n", options.dateFormat().format(startMs), options.dateFormat().format(endMs), totalMbRead, totalMbRead / elapsedSec, totalMessagesRead.get(), (double)totalMessagesRead.get() / elapsedSec, joinTimeMs.get(), fetchTimeInMs, totalMbRead / ((double)fetchTimeInMs / 1000.0), (double)totalMessagesRead.get() / ((double)fetchTimeInMs / 1000.0));
            }
            if (metrics != null) {
                ToolsUtils.printMetrics((Map)metrics);
            }
        }
        catch (Throwable e) {
            System.err.println(e.getMessage());
            System.err.println(Utils.stackTrace((Throwable)e));
            Exit.exit((int)1);
        }
    }

    protected static void printHeader(boolean showDetailedStats) {
        String newFieldsInHeader = ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec";
        if (!showDetailedStats) {
            System.out.printf("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader);
        } else {
            System.out.printf("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader);
        }
    }

    private static void consume(KafkaConsumer<byte[], byte[]> consumer, ConsumerPerfOptions options, AtomicLong totalMessagesRead, AtomicLong totalBytesRead, AtomicLong joinTimeMs, long bytesRead, long messagesRead, long lastBytesRead, long lastMessagesRead, long joinStartMs, AtomicLong joinTimeMsInSingleRound) {
        long currentTimeMs;
        long numMessages = options.numMessages();
        long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
        long reportingIntervalMs = options.reportingIntervalMs();
        boolean showDetailedStats = options.showDetailedStats();
        SimpleDateFormat dateFormat = options.dateFormat();
        consumer.subscribe(options.topic(), (ConsumerRebalanceListener)new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound));
        long lastReportTimeMs = currentTimeMs = System.currentTimeMillis();
        long lastConsumedTimeMs = currentTimeMs;
        while (messagesRead < numMessages && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
            currentTimeMs = System.currentTimeMillis();
            if (!records.isEmpty()) {
                lastConsumedTimeMs = currentTimeMs;
            }
            for (ConsumerRecord record : records) {
                ++messagesRead;
                if (record.key() != null) {
                    bytesRead += (long)((byte[])record.key()).length;
                }
                if (record.value() != null) {
                    bytesRead += (long)((byte[])record.value()).length;
                }
                if (currentTimeMs - lastReportTimeMs < reportingIntervalMs) continue;
                if (showDetailedStats) {
                    ConsumerPerformance.printConsumerProgress(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTimeMs, currentTimeMs, dateFormat, joinTimeMsInSingleRound.get());
                }
                joinTimeMsInSingleRound = new AtomicLong(0L);
                lastReportTimeMs = currentTimeMs;
                lastMessagesRead = messagesRead;
                lastBytesRead = bytesRead;
            }
        }
        if (messagesRead < numMessages) {
            System.out.printf("WARNING: Exiting before consuming the expected number of messages: timeout (%d ms) exceeded. You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs);
        }
        totalMessagesRead.set(messagesRead);
        totalBytesRead.set(bytesRead);
    }

    protected static void printConsumerProgress(int id, long bytesRead, long lastBytesRead, long messagesRead, long lastMessagesRead, long startMs, long endMs, SimpleDateFormat dateFormat, long joinTimeMsInSingleRound) {
        ConsumerPerformance.printBasicProgress(id, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, dateFormat);
        ConsumerPerformance.printExtendedProgress(bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, endMs, joinTimeMsInSingleRound);
        System.out.println();
    }

    private static void printBasicProgress(int id, long bytesRead, long lastBytesRead, long messagesRead, long lastMessagesRead, long startMs, long endMs, SimpleDateFormat dateFormat) {
        double elapsedMs = endMs - startMs;
        double totalMbRead = (double)bytesRead * 1.0 / 1048576.0;
        double intervalMbRead = (double)(bytesRead - lastBytesRead) * 1.0 / 1048576.0;
        double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
        double intervalMessagesPerSec = (double)(messagesRead - lastMessagesRead) / elapsedMs * 1000.0;
        System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f", dateFormat.format(endMs), id, totalMbRead, intervalMbPerSec, messagesRead, intervalMessagesPerSec);
    }

    private static void printExtendedProgress(long bytesRead, long lastBytesRead, long messagesRead, long lastMessagesRead, long startMs, long endMs, long joinTimeMsInSingleRound) {
        long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound;
        double intervalMbRead = (double)(bytesRead - lastBytesRead) * 1.0 / 1048576.0;
        long intervalMessagesRead = messagesRead - lastMessagesRead;
        double intervalMbPerSec = fetchTimeMs <= 0L ? 0.0 : 1000.0 * intervalMbRead / (double)fetchTimeMs;
        double intervalMessagesPerSec = fetchTimeMs <= 0L ? 0.0 : 1000.0 * (double)intervalMessagesRead / (double)fetchTimeMs;
        System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound, fetchTimeMs, intervalMbPerSec, intervalMessagesPerSec);
    }

    protected static class ConsumerPerfOptions
    extends CommandDefaultOptions {
        private final OptionSpec<String> brokerListOpt;
        private final OptionSpec<String> bootstrapServerOpt;
        private final OptionSpec<String> topicOpt;
        private final OptionSpec<String> groupIdOpt;
        private final OptionSpec<Integer> fetchSizeOpt;
        private final OptionSpec<Void> resetBeginningOffsetOpt;
        private final OptionSpec<Integer> socketBufferSizeOpt;
        private final OptionSpec<Integer> numThreadsOpt;
        private final OptionSpec<Integer> numFetchersOpt;
        private final OptionSpec<String> consumerConfigOpt;
        private final OptionSpec<Void> printMetricsOpt;
        private final OptionSpec<Void> showDetailedStatsOpt;
        private final OptionSpec<Long> recordFetchTimeoutOpt;
        private final OptionSpec<Long> numMessagesOpt;
        private final OptionSpec<Long> reportingIntervalOpt;
        private final OptionSpec<String> dateFormatOpt;
        private final OptionSpec<Void> hideHeaderOpt;

        public ConsumerPerfOptions(String[] args) {
            super(args);
            this.brokerListOpt = this.parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The broker list string in the form HOST1:PORT1,HOST2:PORT2.").withRequiredArg().describedAs("broker-list").ofType(String.class);
            this.bootstrapServerOpt = this.parser.accepts("bootstrap-server", "REQUIRED unless --broker-list(deprecated) is specified. The server(s) to connect to.").requiredUnless("broker-list", new String[0]).withRequiredArg().describedAs("server to connect to").ofType(String.class);
            this.topicOpt = this.parser.accepts("topic", "REQUIRED: The topic to consume from.").withRequiredArg().describedAs("topic").ofType(String.class);
            this.groupIdOpt = this.parser.accepts("group", "The group id to consume on.").withRequiredArg().describedAs("gid").defaultsTo((Object)("perf-consumer-" + RND.nextInt(100000)), (Object[])new String[0]).ofType(String.class);
            this.fetchSizeOpt = this.parser.accepts("fetch-size", "The amount of data to fetch in a single request.").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo((Object)0x100000, (Object[])new Integer[0]);
            this.resetBeginningOffsetOpt = this.parser.accepts("from-latest", "If the consumer does not already have an established offset to consume from, start with the latest message present in the log rather than the earliest message.");
            this.socketBufferSizeOpt = this.parser.accepts("socket-buffer-size", "The size of the tcp RECV size.").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo((Object)0x200000, (Object[])new Integer[0]);
            this.numThreadsOpt = this.parser.accepts("threads", "DEPRECATED AND IGNORED: Number of processing threads.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo((Object)10, (Object[])new Integer[0]);
            this.numFetchersOpt = this.parser.accepts("num-fetch-threads", "DEPRECATED AND IGNORED: Number of fetcher threads.").withRequiredArg().describedAs("count").ofType(Integer.class).defaultsTo((Object)1, (Object[])new Integer[0]);
            this.consumerConfigOpt = this.parser.accepts("consumer.config", "Consumer config properties file.").withRequiredArg().describedAs("config file").ofType(String.class);
            this.printMetricsOpt = this.parser.accepts("print-metrics", "Print out the metrics.");
            this.showDetailedStatsOpt = this.parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting interval as configured by reporting-interval");
            this.recordFetchTimeoutOpt = this.parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.").withOptionalArg().describedAs("milliseconds").ofType(Long.class).defaultsTo((Object)10000L, (Object[])new Long[0]);
            this.numMessagesOpt = this.parser.accepts("messages", "REQUIRED: The number of messages to send or consume").withRequiredArg().describedAs("count").ofType(Long.class);
            this.reportingIntervalOpt = this.parser.accepts("reporting-interval", "Interval in milliseconds at which to print progress info.").withRequiredArg().withValuesConvertedBy(RegexMatcher.regex((String)"^\\d+$")).describedAs("interval_ms").ofType(Long.class).defaultsTo((Object)5000L, (Object[])new Long[0]);
            this.dateFormatOpt = this.parser.accepts("date-format", "The date format to use for formatting the time field. See java.text.SimpleDateFormat for options.").withRequiredArg().describedAs("date format").ofType(String.class).defaultsTo((Object)"yyyy-MM-dd HH:mm:ss:SSS", (Object[])new String[0]);
            this.hideHeaderOpt = this.parser.accepts("hide-header", "If set, skips printing the header for the stats");
            try {
                this.options = this.parser.parse(args);
            }
            catch (OptionException e) {
                CommandLineUtils.printUsageAndExit((OptionParser)this.parser, (String)e.getMessage());
                return;
            }
            if (this.options != null) {
                if (this.options.has(this.numThreadsOpt) || this.options.has(this.numFetchersOpt)) {
                    System.out.println("WARNING: option [threads] and [num-fetch-threads] have been deprecated and will be ignored by the test");
                }
                CommandLineUtils.maybePrintHelpOrVersion((CommandDefaultOptions)this, (String)"This tool is used to verify the consumer performance.");
                CommandLineUtils.checkRequiredArgs((OptionParser)this.parser, (OptionSet)this.options, (OptionSpec[])new OptionSpec[]{this.topicOpt, this.numMessagesOpt});
            }
        }

        public boolean printMetrics() {
            return this.options.has(this.printMetricsOpt);
        }

        public String brokerHostsAndPorts() {
            return (String)this.options.valueOf(this.options.has(this.bootstrapServerOpt) ? this.bootstrapServerOpt : this.brokerListOpt);
        }

        public Properties props() throws IOException {
            Properties props = this.options.has(this.consumerConfigOpt) ? Utils.loadProps((String)((String)this.options.valueOf(this.consumerConfigOpt))) : new Properties();
            props.put("bootstrap.servers", this.brokerHostsAndPorts());
            props.put("group.id", this.options.valueOf(this.groupIdOpt));
            props.put("receive.buffer.bytes", ((Integer)this.options.valueOf(this.socketBufferSizeOpt)).toString());
            props.put("max.partition.fetch.bytes", ((Integer)this.options.valueOf(this.fetchSizeOpt)).toString());
            props.put("auto.offset.reset", this.options.has(this.resetBeginningOffsetOpt) ? "latest" : "earliest");
            props.put("key.deserializer", ByteArrayDeserializer.class);
            props.put("value.deserializer", ByteArrayDeserializer.class);
            props.put("check.crcs", "false");
            if (props.getProperty("client.id") == null) {
                props.put("client.id", "perf-consumer-client");
            }
            return props;
        }

        public Set<String> topic() {
            return Collections.singleton((String)this.options.valueOf(this.topicOpt));
        }

        public long numMessages() {
            return (Long)this.options.valueOf(this.numMessagesOpt);
        }

        public long reportingIntervalMs() {
            long value = (Long)this.options.valueOf(this.reportingIntervalOpt);
            if (value <= 0L) {
                throw new IllegalArgumentException("Reporting interval must be greater than 0.");
            }
            return value;
        }

        public boolean showDetailedStats() {
            return this.options.has(this.showDetailedStatsOpt);
        }

        public SimpleDateFormat dateFormat() {
            return new SimpleDateFormat((String)this.options.valueOf(this.dateFormatOpt));
        }

        public boolean hideHeader() {
            return this.options.has(this.hideHeaderOpt);
        }

        public long recordFetchTimeoutMs() {
            return (Long)this.options.valueOf(this.recordFetchTimeoutOpt);
        }
    }

    public static class ConsumerPerfRebListener
    implements ConsumerRebalanceListener {
        private AtomicLong joinTimeMs;
        private AtomicLong joinTimeMsInSingleRound;
        private long joinStartMs;

        public ConsumerPerfRebListener(AtomicLong joinTimeMs, long joinStartMs, AtomicLong joinTimeMsInSingleRound) {
            this.joinTimeMs = joinTimeMs;
            this.joinStartMs = joinStartMs;
            this.joinTimeMsInSingleRound = joinTimeMsInSingleRound;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            this.joinStartMs = System.currentTimeMillis();
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            long elapsedMs = System.currentTimeMillis() - this.joinStartMs;
            this.joinTimeMs.addAndGet(elapsedMs);
            this.joinTimeMsInSingleRound.addAndGet(elapsedMs);
        }
    }
}

