/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.consumer;

import com.pinterest.secor.common.FileRegistry;
import com.pinterest.secor.common.OffsetTracker;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.message.ParsedMessage;
import com.pinterest.secor.parser.MessageParser;
import com.pinterest.secor.reader.MessageReader;
import com.pinterest.secor.uploader.UploadManager;
import com.pinterest.secor.uploader.Uploader;
import com.pinterest.secor.util.ReflectionUtil;
import com.pinterest.secor.writer.MessageWriter;
import java.io.IOException;
import kafka.consumer.ConsumerTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer
extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(Consumer.class);
    private SecorConfig mConfig;
    private MessageReader mMessageReader;
    private MessageWriter mMessageWriter;
    private MessageParser mMessageParser;
    private OffsetTracker mOffsetTracker;
    private Uploader mUploader;
    private double mUnparsableMessages;

    public Consumer(SecorConfig config) {
        this.mConfig = config;
    }

    private void init() throws Exception {
        this.mOffsetTracker = new OffsetTracker();
        this.mMessageReader = new MessageReader(this.mConfig, this.mOffsetTracker);
        FileRegistry fileRegistry = new FileRegistry(this.mConfig);
        UploadManager uploadManager = ReflectionUtil.createUploadManager(this.mConfig.getUploadManagerClass(), this.mConfig);
        this.mUploader = new Uploader(this.mConfig, this.mOffsetTracker, fileRegistry, uploadManager);
        this.mMessageWriter = new MessageWriter(this.mConfig, this.mOffsetTracker, fileRegistry);
        this.mMessageParser = ReflectionUtil.createMessageParser(this.mConfig.getMessageParserClass(), this.mConfig);
        this.mUnparsableMessages = 0.0;
    }

    @Override
    public void run() {
        boolean hasMoreMessages;
        try {
            this.init();
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to initialize the consumer", e);
        }
        long checkEveryNSeconds = Math.min(600L, this.mConfig.getMaxFileAgeSeconds() / 2L);
        long checkMessagesPerSecond = this.mConfig.getMessagesPerSecond();
        long nMessages = 0L;
        long lastChecked = System.currentTimeMillis();
        while (hasMoreMessages = this.consumeNextMessage()) {
            long now = System.currentTimeMillis();
            if (nMessages++ % checkMessagesPerSecond != 0L && now - lastChecked <= checkEveryNSeconds * 1000L) continue;
            lastChecked = now;
            this.checkUploadPolicy();
        }
        this.checkUploadPolicy();
    }

    private void checkUploadPolicy() {
        try {
            this.mUploader.applyPolicy();
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to apply upload policy", e);
        }
    }

    private boolean consumeNextMessage() {
        Message rawMessage = null;
        try {
            boolean hasNext = this.mMessageReader.hasNext();
            if (!hasNext) {
                return false;
            }
            rawMessage = this.mMessageReader.read();
        }
        catch (ConsumerTimeoutException e) {
            LOG.trace("Consumer timed out", (Throwable)e);
        }
        if (rawMessage != null) {
            try {
                this.mMessageWriter.adjustOffset(rawMessage);
            }
            catch (IOException e) {
                throw new RuntimeException("Failed to adjust offset.", e);
            }
            ParsedMessage parsedMessage = null;
            try {
                parsedMessage = this.mMessageParser.parse(rawMessage);
                double DECAY = 0.999;
                this.mUnparsableMessages *= 0.999;
            }
            catch (Exception e) {
                this.mUnparsableMessages += 1.0;
                double MAX_UNPARSABLE_MESSAGES = 1000.0;
                if (this.mUnparsableMessages > 1000.0) {
                    throw new RuntimeException("Failed to parse message " + rawMessage, e);
                }
                LOG.warn("Failed to parse message {}", (Object)rawMessage, (Object)e);
            }
            if (parsedMessage != null) {
                try {
                    this.mMessageWriter.write(parsedMessage);
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to write message " + parsedMessage, e);
                }
            }
        }
        return true;
    }

    public OffsetTracker getOffsetTracker() {
        return this.mOffsetTracker;
    }
}

