/*
 * Decompiled with CFR 0.152.
 */
package io.riemann.riemann.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.riemann.riemann.Proto;
import io.riemann.riemann.client.ExceptionReporter;
import io.riemann.riemann.client.Promise;
import io.riemann.riemann.client.Write;
import io.riemann.riemann.client.WriteQueue;
import java.io.IOException;

public class TcpHandler
extends CombinedChannelDuplexHandler<Inbound, Outbound> {
    public final WriteQueue queue = new WriteQueue();
    public final ExceptionReporter exceptionReporter;
    public volatile IOException lastError = new IOException("Channel closed.");

    public TcpHandler(ExceptionReporter exceptionReporter) {
        this.exceptionReporter = exceptionReporter;
        this.init((ChannelInboundHandler)new Inbound(), (ChannelOutboundHandler)new Outbound());
    }

    public class Inbound
    extends SimpleChannelInboundHandler<Proto.Msg> {
        protected void channelRead0(ChannelHandlerContext ctx, Proto.Msg msg) {
            TcpHandler.this.queue.take().deliver(msg);
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            try {
                TcpHandler.this.exceptionReporter.reportException(cause);
            }
            catch (Exception exception) {
                // empty catch block
            }
            TcpHandler.this.queue.close(cause);
            ctx.channel().close();
            super.exceptionCaught(ctx, cause);
        }
    }

    public class Outbound
    extends ChannelOutboundHandlerAdapter {
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise channelPromise) throws Exception {
            Write write = (Write)msg;
            final Promise<Proto.Msg> promise = write.promise;
            channelPromise.addListener((GenericFutureListener)new GenericFutureListener<Future<? super Void>>(){

                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()) {
                        TcpHandler.this.queue.put(promise);
                    } else if (future.cause() != null) {
                        promise.deliver(new IOException("Write failed.", future.cause()));
                    } else {
                        promise.deliver(new IOException("Write failed."));
                    }
                }
            });
            super.write(ctx, (Object)((Write)msg).message, channelPromise);
        }
    }
}

