/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.NullAddress;
import org.jgroups.View;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.logging.Log;
import org.jgroups.protocols.Bundler;
import org.jgroups.protocols.TP;
import org.jgroups.util.ByteArrayDataOutputStream;
import org.jgroups.util.Util;

public class PerDestinationBundler
implements Bundler {
    @Property(name="max_size", type=AttributeType.BYTES, description="Maximum number of bytes for messages to be queued (per destination) until they are sent")
    protected int max_size = 64000;
    @Property(description="The maximum number of queued messages per destination. When the queue is full, a new batch will be sent")
    protected int max_queue_size = 128;
    @ManagedAttribute(description="Total number of messages sent (single and batches)", type=AttributeType.SCALAR)
    protected final LongAdder total_msgs_sent = new LongAdder();
    @ManagedAttribute(description="Number of single messages sent", type=AttributeType.SCALAR)
    protected final LongAdder num_single_msgs_sent = new LongAdder();
    @ManagedAttribute(description="Number of batches sent", type=AttributeType.SCALAR)
    protected final LongAdder num_batches_sent = new LongAdder();
    @ManagedAttribute(description="Number of batches sent because the queue was full", type=AttributeType.SCALAR)
    protected final LongAdder num_sends_due_to_max_size = new LongAdder();
    @ManagedAttribute(description="Number of batches sent because the max number of messages has been reached (max_queue_size)", type=AttributeType.SCALAR)
    protected final LongAdder num_sends_due_to_full_queue = new LongAdder();
    @ManagedAttribute(description="Number of batches sent because the last sender thread returned", type=AttributeType.SCALAR)
    protected final LongAdder num_sends_due_to_last_thread = new LongAdder();
    protected TP transport;
    protected Log log;
    protected Address local_addr;
    protected final Map<Address, SendBuffer> dests = Util.createConcurrentMap();
    protected static final Address NULL = new NullAddress();

    @Override
    public int size() {
        return this.dests.values().stream().map(SendBuffer::size).reduce(0, Integer::sum);
    }

    @Override
    public int getQueueSize() {
        return -1;
    }

    @Override
    public int getMaxSize() {
        return this.max_size;
    }

    @Override
    public Bundler setMaxSize(int s) {
        this.max_size = s;
        return this;
    }

    public int getMaxQueueSize() {
        return this.max_queue_size;
    }

    public Bundler setMaxQueueSize(int s) {
        this.max_queue_size = s;
        return this;
    }

    @ManagedAttribute(description="Average number of messages in an BatchMessage")
    public double avgBatchSize() {
        long num_batches = this.num_batches_sent.sum();
        long total_msgs = this.total_msgs_sent.sum();
        long single_msgs = this.num_single_msgs_sent.sum();
        if (num_batches == 0L || total_msgs == 0L) {
            return 0.0;
        }
        long batched_msgs = total_msgs - single_msgs;
        return (double)batched_msgs / (double)num_batches;
    }

    @Override
    public void resetStats() {
        Stream.of(this.total_msgs_sent, this.num_batches_sent, this.num_sends_due_to_max_size, this.num_sends_due_to_full_queue, this.num_sends_due_to_last_thread).forEach(LongAdder::reset);
    }

    @Override
    public void init(TP transport) {
        this.transport = Objects.requireNonNull(transport);
        this.log = transport.getLog();
    }

    @Override
    public void start() {
        this.local_addr = Objects.requireNonNull(this.transport.getAddress());
    }

    @Override
    public void stop() {
    }

    @Override
    public void send(Message msg) throws Exception {
        if (msg.getSrc() == null) {
            msg.setSrc(this.local_addr);
        }
        Address dest = msg.dest() == null ? NULL : msg.dest();
        SendBuffer buf = this.dests.computeIfAbsent(dest, k -> new SendBuffer());
        buf.addMessage(dest, msg);
    }

    @Override
    public void viewChange(View view) {
        List<Address> mbrs = view.getMembers();
        if (mbrs == null) {
            return;
        }
        mbrs.stream().filter(dest -> !this.dests.containsKey(dest)).forEach(dest -> this.dests.putIfAbsent((Address)dest, new SendBuffer()));
        this.dests.keySet().stream().filter(dest -> !mbrs.contains(dest) && dest != NULL).forEach(this.dests::remove);
    }

    protected class SendBuffer {
        private final Message[] msgs;
        private int index;
        private long total_bytes;
        private final AtomicInteger thread_count = new AtomicInteger();
        private final ByteArrayDataOutputStream output;
        private final Lock lock = new ReentrantLock(false);

        protected SendBuffer() {
            this.msgs = new Message[PerDestinationBundler.this.max_queue_size];
            this.index = 0;
            this.output = new ByteArrayDataOutputStream(PerDestinationBundler.this.max_size + 3);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void addMessage(Address dest, Message msg) {
            int msg_bytes = msg.getLength();
            this.thread_count.incrementAndGet();
            this.lock.lock();
            try {
                if (this.total_bytes + (long)msg_bytes >= (long)PerDestinationBundler.this.max_size) {
                    PerDestinationBundler.this.num_sends_due_to_max_size.increment();
                    this.sendBatch(dest);
                }
                this.msgs[this.index++] = msg;
                this.total_bytes += (long)msg_bytes;
                if (this.index == this.msgs.length) {
                    PerDestinationBundler.this.num_sends_due_to_full_queue.increment();
                    this.sendBatch(dest);
                }
                if (this.thread_count.decrementAndGet() == 0) {
                    PerDestinationBundler.this.num_sends_due_to_last_thread.increment();
                    this.sendBatch(dest);
                }
            }
            finally {
                this.lock.unlock();
            }
        }

        protected void sendBatch(Address destination) {
            Address dest;
            if (this.index == 0) {
                return;
            }
            Address address = dest = destination == NULL ? null : destination;
            if (this.index == 1) {
                this.sendSingleMessage(dest, this.msgs[0]);
                PerDestinationBundler.this.num_single_msgs_sent.increment();
            } else {
                this.sendMessageList(dest, PerDestinationBundler.this.local_addr, this.msgs, this.index);
                PerDestinationBundler.this.num_batches_sent.increment();
            }
            PerDestinationBundler.this.total_msgs_sent.add(this.index);
            this.total_bytes = 0L;
            this.index = 0;
        }

        protected void sendSingleMessage(Address dest, Message msg) {
            try {
                this.output.position(0);
                Util.writeMessage(msg, this.output, dest == null);
                PerDestinationBundler.this.transport.doSend(this.output.buffer(), 0, this.output.position(), dest);
                if (PerDestinationBundler.this.transport.statsEnabled()) {
                    PerDestinationBundler.this.transport.getMessageStats().incrNumSingleMsgsSent(1);
                }
            }
            catch (Throwable e) {
                PerDestinationBundler.this.log.error("%s: failed sending message to %s: %s", PerDestinationBundler.this.local_addr, dest, e);
            }
        }

        protected void sendMessageList(Address dest, Address src, Message[] list, int length) {
            try {
                this.output.position(0);
                Util.writeMessageList(dest, src, PerDestinationBundler.this.transport.cluster_name.chars(), list, 0, length, this.output, dest == null, PerDestinationBundler.this.transport.getId());
                PerDestinationBundler.this.transport.doSend(this.output.buffer(), 0, this.output.position(), dest);
                if (PerDestinationBundler.this.transport.statsEnabled()) {
                    PerDestinationBundler.this.transport.getMessageStats().incrNumBatchesSent(1);
                }
            }
            catch (Throwable e) {
                PerDestinationBundler.this.log.trace(Util.getMessage("FailureSendingMsgBundle"), PerDestinationBundler.this.transport.getAddress(), e);
            }
        }

        public String toString() {
            return String.format("%d msgs", this.size());
        }

        protected int size() {
            this.lock.lock();
            try {
                int n = this.index;
                return n;
            }
            finally {
                this.lock.unlock();
            }
        }
    }
}

