/*
 * Decompiled with CFR 0.152.
 */
package com.rapportive.storm.spout;

import backtype.storm.spout.Scheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.utils.Utils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import com.rapportive.storm.amqp.QueueDeclaration;
import java.io.IOException;
import java.util.Map;
import org.apache.log4j.Logger;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class AMQPSpout
implements IRichSpout {
    private static final long serialVersionUID = 11258942292629263L;
    private static final Logger log = Logger.getLogger(AMQPSpout.class);
    public static final String CONFIG_PREFETCH_COUNT = "amqp.prefetch.count";
    private static final long DEFAULT_PREFETCH_COUNT = 100L;
    public static final long WAIT_FOR_NEXT_MESSAGE = 1L;
    public static final long WAIT_AFTER_SHUTDOWN_SIGNAL = 10000L;
    private final String amqpHost;
    private final int amqpPort;
    private final String amqpUsername;
    private final String amqpPassword;
    private final String amqpVhost;
    private final boolean requeueOnFail;
    private final QueueDeclaration queueDeclaration;
    private final Scheme serialisationScheme;
    private transient boolean spoutActive = true;
    private transient Connection amqpConnection;
    private transient Channel amqpChannel;
    private transient QueueingConsumer amqpConsumer;
    private transient String amqpConsumerTag;
    private SpoutOutputCollector collector;
    private int prefetchCount;

    public AMQPSpout(String host, int port, String username, String password, String vhost, QueueDeclaration queueDeclaration, Scheme scheme) {
        this(host, port, username, password, vhost, queueDeclaration, scheme, false);
    }

    public AMQPSpout(String host, int port, String username, String password, String vhost, QueueDeclaration queueDeclaration, Scheme scheme, boolean requeueOnFail) {
        this.amqpHost = host;
        this.amqpPort = port;
        this.amqpUsername = username;
        this.amqpPassword = password;
        this.amqpVhost = vhost;
        this.queueDeclaration = queueDeclaration;
        this.requeueOnFail = requeueOnFail;
        this.serialisationScheme = scheme;
    }

    public void ack(Object msgId) {
        if (msgId instanceof Long) {
            long deliveryTag = (Long)msgId;
            if (this.amqpChannel != null) {
                try {
                    this.amqpChannel.basicAck(deliveryTag, false);
                }
                catch (IOException e) {
                    log.warn((Object)("Failed to ack delivery-tag " + deliveryTag), (Throwable)e);
                }
                catch (ShutdownSignalException e) {
                    log.warn((Object)("AMQP connection failed. Failed to ack delivery-tag " + deliveryTag), (Throwable)e);
                }
            }
        } else {
            log.warn((Object)String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
        }
    }

    public void close() {
        try {
            if (this.amqpChannel != null) {
                if (this.amqpConsumerTag != null) {
                    this.amqpChannel.basicCancel(this.amqpConsumerTag);
                }
                this.amqpChannel.close();
            }
        }
        catch (IOException e) {
            log.warn((Object)"Error closing AMQP channel", (Throwable)e);
        }
        try {
            if (this.amqpConnection != null) {
                this.amqpConnection.close();
            }
        }
        catch (IOException e) {
            log.warn((Object)"Error closing AMQP connection", (Throwable)e);
        }
    }

    public void activate() {
        log.info((Object)"Unpausing spout");
        this.spoutActive = true;
    }

    public void deactivate() {
        log.info((Object)"Pausing spout");
        this.spoutActive = false;
    }

    public void fail(Object msgId) {
        if (msgId instanceof Long) {
            long deliveryTag = (Long)msgId;
            if (this.amqpChannel != null) {
                try {
                    this.amqpChannel.basicReject(deliveryTag, this.requeueOnFail);
                }
                catch (IOException e) {
                    log.warn((Object)("Failed to reject delivery-tag " + deliveryTag), (Throwable)e);
                }
            }
        } else {
            log.warn((Object)String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
        }
    }

    public void nextTuple() {
        if (this.spoutActive && this.amqpConsumer != null) {
            try {
                QueueingConsumer.Delivery delivery = this.amqpConsumer.nextDelivery(1L);
                if (delivery == null) {
                    return;
                }
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                byte[] message = delivery.getBody();
                this.collector.emit(this.serialisationScheme.deserialize(message), (Object)deliveryTag);
            }
            catch (ShutdownSignalException e) {
                log.warn((Object)"AMQP connection dropped, will attempt to reconnect...");
                Utils.sleep((long)10000L);
                this.reconnect();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
        Long prefetchCount = (Long)config.get(CONFIG_PREFETCH_COUNT);
        if (prefetchCount == null) {
            log.info((Object)"Using default prefetch-count");
            prefetchCount = 100L;
        } else if (prefetchCount < 1L) {
            throw new IllegalArgumentException("amqp.prefetch.count must be at least 1");
        }
        this.prefetchCount = prefetchCount.intValue();
        try {
            this.collector = collector;
            this.setupAMQP();
        }
        catch (IOException e) {
            log.error((Object)"AMQP setup failed", (Throwable)e);
        }
    }

    private void setupAMQP() throws IOException {
        int prefetchCount = this.prefetchCount;
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(this.amqpHost);
        connectionFactory.setPort(this.amqpPort);
        connectionFactory.setUsername(this.amqpUsername);
        connectionFactory.setPassword(this.amqpPassword);
        connectionFactory.setVirtualHost(this.amqpVhost);
        this.amqpConnection = connectionFactory.newConnection();
        this.amqpChannel = this.amqpConnection.createChannel();
        log.info((Object)("Setting basic.qos prefetch-count to " + prefetchCount));
        this.amqpChannel.basicQos(prefetchCount);
        AMQP.Queue.DeclareOk queue = this.queueDeclaration.declare(this.amqpChannel);
        String queueName = queue.getQueue();
        log.info((Object)("Consuming queue " + queueName));
        this.amqpConsumer = new QueueingConsumer(this.amqpChannel);
        this.amqpConsumerTag = this.amqpChannel.basicConsume(queueName, false, (Consumer)this.amqpConsumer);
    }

    private void reconnect() {
        log.info((Object)"Reconnecting to AMQP broker...");
        try {
            this.setupAMQP();
        }
        catch (IOException e) {
            log.warn((Object)"Failed to reconnect to AMQP broker", (Throwable)e);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(this.serialisationScheme.getOutputFields());
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

