/*
 * Decompiled with CFR 0.152.
 */
package io.riemann.riemann.client;

import io.riemann.riemann.Proto;
import io.riemann.riemann.client.ChainPromise;
import io.riemann.riemann.client.EventDSL;
import io.riemann.riemann.client.IPromise;
import io.riemann.riemann.client.IRiemannClient;
import io.riemann.riemann.client.RiemannClient;
import io.riemann.riemann.client.Transport;
import io.riemann.riemann.client.UnsupportedJVMException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class RiemannBatchClient
implements IRiemannClient {
    public final int batchSize;
    public final AtomicInteger bufferSize = new AtomicInteger();
    public final LinkedTransferQueue<Write> buffer;
    public final IRiemannClient client;
    public final AtomicLong readPromiseTimeout = new AtomicLong(5000L);

    public RiemannBatchClient(IRiemannClient client) throws UnsupportedJVMException {
        this(client, 10);
    }

    public RiemannBatchClient(IRiemannClient client, int batchSize) throws UnsupportedJVMException {
        this.client = client;
        this.batchSize = batchSize;
        this.buffer = new LinkedTransferQueue();
    }

    @Override
    public IPromise<Proto.Msg> sendMessage(Proto.Msg message) {
        return this.client.sendMessage(message);
    }

    @Override
    public IPromise<Proto.Msg> sendEvents(List<Proto.Event> events) {
        ChainPromise<Proto.Msg> p = new ChainPromise<Proto.Msg>();
        for (Proto.Event event : events) {
            this.queue(new Write(event, p));
        }
        return p;
    }

    @Override
    public IPromise<Proto.Msg> sendEvents(Proto.Event ... events) {
        return this.sendEvents(Arrays.asList(events));
    }

    @Override
    public IPromise<Proto.Msg> sendEvent(Proto.Event event) {
        ChainPromise<Proto.Msg> p = new ChainPromise<Proto.Msg>();
        this.queue(new Write(event, p));
        return p;
    }

    @Override
    public IPromise<Proto.Msg> sendException(String service, Throwable t) {
        return RiemannClient.sendException(this, service, t);
    }

    @Override
    public IPromise<List<Proto.Event>> query(String q) {
        return this.client.query(q);
    }

    @Override
    public EventDSL event() {
        return new EventDSL(this);
    }

    public void queue(Write write) {
        this.buffer.put(write);
        if (this.batchSize <= this.bufferSize.addAndGet(1)) {
            this.flush();
        }
    }

    public int flush2() {
        int maxWrites = Math.min(this.batchSize, this.bufferSize.get());
        ArrayList writes = new ArrayList(maxWrites);
        this.buffer.drainTo(writes, maxWrites);
        this.bufferSize.addAndGet(-1 * writes.size());
        Proto.Msg.Builder message = Proto.Msg.newBuilder();
        for (Write write : writes) {
            message.addEvents(write.event);
        }
        IPromise<Proto.Msg> clientPromise = this.client.sendMessage(message.build());
        for (Write write : writes) {
            write.promise.attach(clientPromise);
        }
        try {
            this.client.flush();
        }
        catch (IOException iOException) {
            // empty catch block
        }
        return writes.size();
    }

    @Override
    public void flush() {
        this.flush2();
    }

    @Override
    public boolean isConnected() {
        return this.client.isConnected();
    }

    @Override
    public void connect() throws IOException {
        this.client.connect();
    }

    @Override
    public void close() {
        try {
            this.flush();
        }
        finally {
            this.client.close();
        }
    }

    @Override
    public void reconnect() throws IOException {
        this.client.reconnect();
    }

    @Override
    public Transport transport() {
        return this.client;
    }

    public class Write {
        public final Proto.Event event;
        public final ChainPromise<Proto.Msg> promise;

        public Write(Proto.Event event, ChainPromise promise) {
            this.event = event;
            this.promise = promise;
        }
    }
}

