/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.parser;

import com.pinterest.secor.common.KafkaClient;
import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.TopicPartition;
import com.pinterest.secor.common.ZookeeperConnector;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.parser.QuboleClient;
import com.pinterest.secor.parser.TimestampedMessageParser;
import com.pinterest.secor.util.CompressionUtil;
import com.pinterest.secor.util.FileUtil;
import com.pinterest.secor.util.ReflectionUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Stack;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionFinalizer {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionFinalizer.class);
    private final SecorConfig mConfig;
    private final ZookeeperConnector mZookeeperConnector;
    private final TimestampedMessageParser mMessageParser;
    private final KafkaClient mKafkaClient;
    private final QuboleClient mQuboleClient;
    private final String mFileExtension;
    private final int mLookbackPeriods;

    public PartitionFinalizer(SecorConfig config) throws Exception {
        this.mConfig = config;
        this.mKafkaClient = new KafkaClient(this.mConfig);
        this.mZookeeperConnector = new ZookeeperConnector(this.mConfig);
        this.mMessageParser = (TimestampedMessageParser)ReflectionUtil.createMessageParser(this.mConfig.getMessageParserClass(), this.mConfig);
        this.mQuboleClient = new QuboleClient(this.mConfig);
        if (this.mConfig.getFileExtension() != null) {
            this.mFileExtension = this.mConfig.getFileExtension();
        } else if (this.mConfig.getCompressionCodec() != null && !this.mConfig.getCompressionCodec().isEmpty()) {
            CompressionCodec codec = CompressionUtil.createCompressionCodec(this.mConfig.getCompressionCodec());
            this.mFileExtension = codec.getDefaultExtension();
        } else {
            this.mFileExtension = "";
        }
        this.mLookbackPeriods = config.getFinalizerLookbackPeriods();
        LOG.info("Lookback periods: " + this.mLookbackPeriods);
    }

    private String[] getFinalizedUptoPartitions(String topic) throws Exception {
        int numPartitions = this.mKafkaClient.getNumPartitions(topic);
        ArrayList<Message> lastMessages = new ArrayList<Message>(numPartitions);
        ArrayList<Message> committedMessages = new ArrayList<Message>(numPartitions);
        for (int partition = 0; partition < numPartitions; ++partition) {
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            Message lastMessage = this.mKafkaClient.getLastMessage(topicPartition);
            Message committedMessage = this.mKafkaClient.getCommittedMessage(topicPartition);
            if (lastMessage == null || committedMessage == null) {
                LOG.error("For topic {} partition {}, lastMessage: {}, commmitted: {}", new Object[]{topicPartition.getTopic(), topicPartition.getPartition(), lastMessage, committedMessage});
                continue;
            }
            lastMessages.add(lastMessage);
            committedMessages.add(committedMessage);
        }
        return this.mMessageParser.getFinalizedUptoPartitions(lastMessages, committedMessages);
    }

    private void finalizePartitionsUpTo(String topic, String[] uptoPartitions) throws Exception {
        String successFilePath;
        String logFileDir;
        LogFilePath logFilePath;
        LOG.info("Finalize up to (but not include) {}, dim: {}", (Object)uptoPartitions, (Object)uptoPartitions.length);
        Object[] previous = this.mMessageParser.getPreviousPartitions(uptoPartitions);
        Stack<Object[]> toBeFinalized = new Stack<Object[]>();
        for (int i = 0; i < this.mLookbackPeriods; ++i) {
            LOG.info("Looking for partition: " + Arrays.toString(previous));
            logFilePath = new LogFilePath(this.mConfig.getS3Prefix(), topic, (String[])previous, this.mConfig.getGeneration(), 0, 0L, this.mFileExtension);
            logFileDir = logFilePath.getLogFileDir();
            if (FileUtil.exists(logFileDir)) {
                successFilePath = logFileDir + "/_SUCCESS";
                if (FileUtil.exists(successFilePath)) {
                    LOG.info("SuccessFile exist already, short circuit return. " + successFilePath);
                    break;
                }
                LOG.info("Folder {} exists and ready to be finalized.", (Object)logFileDir);
                toBeFinalized.push(previous);
            } else {
                LOG.info("Folder {} doesn't exist, skip", (Object)logFileDir);
            }
            previous = this.mMessageParser.getPreviousPartitions((String[])previous);
        }
        LOG.info("To be finalized partitions: {}", toBeFinalized);
        if (toBeFinalized.isEmpty()) {
            LOG.warn("There is no partitions to be finalized.");
            return;
        }
        while (!toBeFinalized.isEmpty()) {
            Object[] current;
            block14: {
                current = (String[])toBeFinalized.pop();
                LOG.info("Finalizing partition: " + Arrays.toString(current));
                if (uptoPartitions.length == current.length) {
                    try {
                        StringBuilder sb = new StringBuilder();
                        for (int i = 0; i < current.length; ++i) {
                            Object par = current[i];
                            String[] parts = ((String)par).split("=");
                            assert (parts.length == 2) : "wrong partition format: " + (String)par;
                            if (i > 0) {
                                sb.append(",");
                            }
                            sb.append(parts[0]);
                            sb.append("='");
                            sb.append(parts[1]);
                            sb.append("'");
                        }
                        LOG.info("Hive partition string: " + sb);
                        String hivePrefix = null;
                        try {
                            hivePrefix = this.mConfig.getHivePrefix();
                        }
                        catch (RuntimeException ex) {
                            LOG.warn("HivePrefix is not defined.  Skip hive registration");
                        }
                        if (hivePrefix == null) break block14;
                        this.mQuboleClient.addPartition(hivePrefix + topic, sb.toString());
                    }
                    catch (Exception e) {
                        LOG.error("failed to finalize topic " + topic, (Throwable)e);
                        continue;
                    }
                }
            }
            logFilePath = new LogFilePath(this.mConfig.getS3Prefix(), topic, (String[])current, this.mConfig.getGeneration(), 0, 0L, this.mFileExtension);
            logFileDir = logFilePath.getLogFileDir();
            successFilePath = logFileDir + "/_SUCCESS";
            LOG.info("touching file {}", (Object)successFilePath);
            FileUtil.touch(successFilePath);
        }
    }

    public void finalizePartitions() throws Exception {
        List<String> topics = this.mZookeeperConnector.getCommittedOffsetTopics();
        for (String topic : topics) {
            if (!topic.matches(this.mConfig.getKafkaTopicFilter())) {
                LOG.info("skipping topic {}", (Object)topic);
                continue;
            }
            LOG.info("finalizing topic {}", (Object)topic);
            String[] partitions = this.getFinalizedUptoPartitions(topic);
            LOG.info("finalized timestamp for topic {} is {}", (Object)topic, (Object)partitions);
            if (partitions == null) continue;
            this.finalizePartitionsUpTo(topic, partitions);
        }
    }
}

