/*
 * Decompiled with CFR 0.152.
 */
package nl.minvenj.nfi.storm.kafka;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.SortedMap;
import java.util.TreeMap;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import nl.minvenj.nfi.storm.kafka.fail.FailHandler;
import nl.minvenj.nfi.storm.kafka.util.ConfigUtils;
import nl.minvenj.nfi.storm.kafka.util.KafkaMessageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSpout
implements IRichSpout {
    private static final long serialVersionUID = -1L;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
    protected final SortedMap<KafkaMessageId, byte[]> _inProgress = new TreeMap<KafkaMessageId, byte[]>();
    protected final Queue<KafkaMessageId> _queue = new LinkedList<KafkaMessageId>();
    protected String _topic;
    protected int _bufSize;
    protected FailHandler _failHandler;
    protected ConsumerIterator<byte[], byte[]> _iterator;
    protected transient SpoutOutputCollector _collector;
    protected transient ConsumerConnector _consumer;

    protected void createFailHandler(String failHandler) {
        this._failHandler = failHandler == null ? ConfigUtils.DEFAULT_FAIL_HANDLER : ConfigUtils.createFailHandlerFromString(failHandler);
    }

    protected void createConsumer(Map<String, Object> config) {
        Properties consumerConfig = ConfigUtils.createKafkaConfig(config);
        LOG.info("connecting kafka client to zookeeper at {} as client group {}", (Object)consumerConfig.getProperty("zookeeper.connect"), (Object)consumerConfig.getProperty("group.id"));
        this._consumer = Consumer.createJavaConsumerConnector((ConsumerConfig)new ConsumerConfig(consumerConfig));
    }

    protected boolean fillBuffer() {
        if (!this._inProgress.isEmpty() || !this._queue.isEmpty()) {
            throw new IllegalStateException("cannot fill buffer when buffer or pending messages are non-empty");
        }
        if (this._iterator == null) {
            Map streams = this._consumer.createMessageStreams(Collections.singletonMap(this._topic, 1));
            this._iterator = ((KafkaStream)((List)streams.get(this._topic)).get(0)).iterator();
        }
        try {
            for (int size = 0; size < this._bufSize && this._iterator.hasNext(); ++size) {
                MessageAndMetadata message = this._iterator.next();
                KafkaMessageId id = new KafkaMessageId(message.partition(), message.offset());
                this._inProgress.put(id, (byte[])message.message());
            }
        }
        catch (ConsumerTimeoutException consumerTimeoutException) {
            // empty catch block
        }
        if (this._inProgress.size() > 0) {
            this._queue.addAll(this._inProgress.keySet());
            LOG.debug("buffer now has {} messages to be emitted", (Object)this._queue.size());
            return true;
        }
        return false;
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(new String[]{"bytes"}));
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    public void open(Map config, TopologyContext topology, SpoutOutputCollector collector) {
        this._collector = collector;
        this._topic = ConfigUtils.getTopic(config);
        this._bufSize = ConfigUtils.getMaxBufSize(config);
        this.createFailHandler((String)config.get("kafka.spout.fail.handler"));
        this.createConsumer(config);
        this._failHandler.open(config, topology, collector);
        LOG.info("kafka spout opened, reading from topic {}, using failure policy {}", (Object)this._topic, (Object)this._failHandler.getIdentifier());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        this._collector = null;
        this._iterator = null;
        if (this._consumer != null) {
            try {
                this._consumer.shutdown();
            }
            finally {
                this._consumer = null;
            }
        }
        this._failHandler.close();
    }

    public void activate() {
        this._failHandler.activate();
    }

    public void deactivate() {
        this._failHandler.deactivate();
    }

    public void nextTuple() {
        KafkaMessageId nextId;
        if ((!this._queue.isEmpty() || this._inProgress.isEmpty() && this.fillBuffer()) && (nextId = this._queue.poll()) != null) {
            byte[] message = (byte[])this._inProgress.get(nextId);
            if (message == null) {
                throw new IllegalStateException("no pending message for next id " + nextId);
            }
            this._collector.emit((List)new Values(new Object[]{message}), (Object)nextId);
            LOG.debug("emitted kafka message id {} ({} bytes payload)", (Object)nextId, (Object)message.length);
        }
    }

    public void ack(Object o) {
        if (o instanceof KafkaMessageId) {
            KafkaMessageId id = (KafkaMessageId)o;
            this._inProgress.remove(id);
            LOG.debug("kafka message {} acknowledged", (Object)id);
            if (this._inProgress.isEmpty()) {
                LOG.debug("all pending messages acknowledged, committing client offsets");
                this._consumer.commitOffsets();
            }
            this._failHandler.ack(id);
        }
    }

    public void fail(Object o) {
        if (o instanceof KafkaMessageId) {
            KafkaMessageId id = (KafkaMessageId)o;
            if (this._failHandler.shouldReplay(id)) {
                LOG.debug("kafka message id {} failed in topology, adding to buffer again", (Object)id);
                this._queue.add(id);
            } else {
                LOG.debug("kafka message id {} failed in topology, delegating failure to policy", (Object)id);
                this._failHandler.fail(id, (byte[])this._inProgress.remove(id));
            }
        }
    }
}

