/*
 * Decompiled with CFR 0.152.
 */
package io.riemann.riemann.client;

import com.google.protobuf.MessageLite;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.riemann.riemann.Proto;
import io.riemann.riemann.client.AsynchronousTransport;
import io.riemann.riemann.client.ExceptionReporter;
import io.riemann.riemann.client.IPromise;
import io.riemann.riemann.client.OverloadedException;
import io.riemann.riemann.client.Promise;
import io.riemann.riemann.client.ReconnectHandler;
import io.riemann.riemann.client.TcpHandler;
import io.riemann.riemann.client.Transport;
import io.riemann.riemann.client.Write;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TcpTransport
implements AsynchronousTransport {
    public final Logger logger = LoggerFactory.getLogger(TcpTransport.class);
    public static final ProtobufDecoder pbDecoder = new ProtobufDecoder((MessageLite)Proto.Msg.getDefaultInstance());
    public static final ProtobufEncoder pbEncoder = new ProtobufEncoder();
    public static final LengthFieldPrepender frameEncoder = new LengthFieldPrepender(4);
    public static final int DEFAULT_PORT = 5555;
    public volatile State state = State.DISCONNECTED;
    public final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
    public final ChannelGroup channels = new DefaultChannelGroup((EventExecutor)GlobalEventExecutor.INSTANCE);
    public volatile Bootstrap bootstrap;
    public volatile Semaphore writeLimiter = new Semaphore(8192);
    public final AtomicBoolean autoFlush = new AtomicBoolean(true);
    public final AtomicInteger writeLimit = new AtomicInteger(8192);
    public final AtomicLong reconnectDelay = new AtomicLong(5000L);
    public final AtomicInteger connectTimeout = new AtomicInteger(5000);
    public final AtomicInteger writeTimeout = new AtomicInteger(5000);
    public final AtomicInteger writeBufferHigh = new AtomicInteger(65536);
    public final AtomicInteger writeBufferLow = new AtomicInteger(8192);
    public final InetSocketAddress remoteAddress;
    public final InetSocketAddress localAddress;
    public final AtomicReference<SSLContext> sslContext = new AtomicReference();
    public volatile ExceptionReporter exceptionReporter = new ExceptionReporter(){

        @Override
        public void reportException(Throwable t) {
        }
    };

    public void setExceptionReporter(ExceptionReporter exceptionReporter) {
        this.exceptionReporter = exceptionReporter;
    }

    public TcpTransport(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
        this.localAddress = null;
    }

    public TcpTransport(InetSocketAddress remoteAddress, InetSocketAddress localAddress) {
        this.remoteAddress = remoteAddress;
        this.localAddress = localAddress;
    }

    public TcpTransport(String remoteHost, int remotePort) throws IOException {
        this(InetSocketAddress.createUnresolved(remoteHost, remotePort));
    }

    public TcpTransport(String remoteHost, int remotePort, String localHost, int localPort) throws IOException {
        this(InetSocketAddress.createUnresolved(remoteHost, remotePort), InetSocketAddress.createUnresolved(localHost, localPort));
    }

    public TcpTransport(String remoteHost) throws IOException {
        this(remoteHost, 5555);
    }

    public TcpTransport(String remoteHost, String localHost) throws IOException {
        this(remoteHost, 5555, localHost, 0);
    }

    public TcpTransport(int remotePort) throws IOException {
        this(InetAddress.getLocalHost().getHostAddress(), remotePort);
    }

    public synchronized TcpTransport setWriteBufferLimit(int limit) {
        if (this.isConnected()) {
            throw new IllegalStateException("can't modify the write buffer limit of a connected transport; please set the limit before connecting");
        }
        this.writeLimit.set(limit);
        this.writeLimiter = new Semaphore(limit);
        return this;
    }

    @Override
    public boolean isConnected() {
        if (this.state != State.CONNECTED) {
            return false;
        }
        for (Channel ch : this.channels) {
            if (!ch.isOpen()) continue;
            return true;
        }
        return false;
    }

    public SslHandler sslHandler() {
        SSLContext context = this.sslContext.get();
        if (context == null) {
            return null;
        }
        SSLEngine engine = context.createSSLEngine();
        engine.setUseClientMode(true);
        SslHandler handler = new SslHandler(engine);
        return handler;
    }

    @Override
    public synchronized void connect() throws IOException {
        if (this.state != State.DISCONNECTED) {
            return;
        }
        this.state = State.CONNECTING;
        this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.eventLoopGroup)).localAddress((SocketAddress)this.localAddress)).remoteAddress((SocketAddress)this.remoteAddress).channel(NioSocketChannel.class)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel channel) {
                ChannelPipeline p = channel.pipeline();
                p.addLast("reconnect", (ChannelHandler)new ReconnectHandler(TcpTransport.this.bootstrap, TcpTransport.this.channels, TcpTransport.this.reconnectDelay, TimeUnit.MILLISECONDS));
                SslHandler sslHandler = TcpTransport.this.sslHandler();
                if (sslHandler != null) {
                    p.addLast("tls", (ChannelHandler)sslHandler);
                }
                p.addLast("frame-decoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                p.addLast("frame-encoder", (ChannelHandler)frameEncoder);
                p.addLast("protobuf-decoder", (ChannelHandler)pbDecoder);
                p.addLast("protobuf-encoder", (ChannelHandler)pbEncoder);
                p.addLast("handler", (ChannelHandler)new TcpHandler(TcpTransport.this.exceptionReporter));
            }
        });
        this.bootstrap.option(ChannelOption.TCP_NODELAY, (Object)true);
        this.bootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)true);
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)this.connectTimeout.get());
        this.bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, (Object)this.writeBufferLow.get());
        this.bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, (Object)this.writeBufferHigh.get());
        this.bootstrap.localAddress((SocketAddress)this.localAddress);
        this.bootstrap.remoteAddress((SocketAddress)this.remoteAddress);
        ChannelFuture result = this.bootstrap.connect();
        this.channels.add((Object)result.channel());
        result.awaitUninterruptibly();
        this.state = State.CONNECTED;
        if (!result.isSuccess()) {
            throw new IOException("Connection failed", result.cause());
        }
    }

    @Override
    public void close() {
        this.close(false);
    }

    public synchronized void close(boolean force) {
        if (!force && this.state != State.CONNECTED) {
            return;
        }
        try {
            this.channels.close().awaitUninterruptibly();
            this.eventLoopGroup.shutdownGracefully().awaitUninterruptibly();
        }
        finally {
            this.bootstrap = null;
            this.state = State.DISCONNECTED;
        }
    }

    @Override
    public synchronized void reconnect() throws IOException {
        this.close();
        this.connect();
    }

    @Override
    public void flush() throws IOException {
        this.channels.flush();
    }

    @Override
    public IPromise<Proto.Msg> sendMessage(Proto.Msg msg) {
        return this.sendMessage(msg, new Promise<Proto.Msg>());
    }

    public Promise<Proto.Msg> sendMessage(Proto.Msg msg, Promise<Proto.Msg> promise) {
        if (this.state != State.CONNECTED) {
            promise.deliver(new IOException("client not connected"));
            return promise;
        }
        Write write = new Write(msg, promise);
        final Semaphore limiter = this.writeLimiter;
        if (limiter.tryAcquire()) {
            Iterator iterator = this.channels.iterator();
            if (iterator.hasNext()) {
                Channel channel = (Channel)iterator.next();
                ChannelFuture f = this.autoFlush.get() ? channel.writeAndFlush((Object)write) : channel.write((Object)write);
                f.addListener((GenericFutureListener)new ChannelFutureListener(){

                    public void operationComplete(ChannelFuture f) {
                        limiter.release();
                    }
                });
                return promise;
            }
            limiter.release();
            promise.deliver(new IOException("no channels available"));
            return promise;
        }
        promise.deliver(new OverloadedException("client write buffer is full: " + this.writeLimiter.availablePermits() + " / " + this.writeLimit.get() + " messages."));
        return promise;
    }

    @Override
    public Transport transport() {
        return null;
    }

    public static enum State {
        DISCONNECTED,
        CONNECTING,
        CONNECTED,
        DISCONNECTING;

    }
}

