/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.parser;

import com.pinterest.secor.common.KafkaClient;
import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.TopicPartition;
import com.pinterest.secor.common.ZookeeperConnector;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.parser.QuboleClient;
import com.pinterest.secor.parser.ThriftMessageParser;
import com.pinterest.secor.util.CompressionUtil;
import com.pinterest.secor.util.FileUtil;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.List;
import java.util.NavigableSet;
import java.util.TimeZone;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionFinalizer {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionFinalizer.class);
    private SecorConfig mConfig;
    private ZookeeperConnector mZookeeperConnector;
    private ThriftMessageParser mThriftMessageParser;
    private KafkaClient mKafkaClient;
    private QuboleClient mQuboleClient;
    private String mFileExtension;

    public PartitionFinalizer(SecorConfig config) throws Exception {
        this.mConfig = config;
        this.mKafkaClient = new KafkaClient(this.mConfig);
        this.mZookeeperConnector = new ZookeeperConnector(this.mConfig);
        this.mThriftMessageParser = new ThriftMessageParser(this.mConfig);
        this.mQuboleClient = new QuboleClient(this.mConfig);
        if (this.mConfig.getCompressionCodec() != null && !this.mConfig.getCompressionCodec().isEmpty()) {
            CompressionCodec codec = CompressionUtil.createCompressionCodec(this.mConfig.getCompressionCodec());
            this.mFileExtension = codec.getDefaultExtension();
        } else {
            this.mFileExtension = "";
        }
    }

    private long getLastTimestampMillis(TopicPartition topicPartition) throws TException {
        Message message = this.mKafkaClient.getLastMessage(topicPartition);
        return this.mThriftMessageParser.extractTimestampMillis(message);
    }

    private long getLastTimestampMillis(String topic) throws TException {
        int numPartitions = this.mKafkaClient.getNumPartitions(topic);
        long max_timestamp = Long.MIN_VALUE;
        for (int partition = 0; partition < numPartitions; ++partition) {
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            long timestamp = this.getLastTimestampMillis(topicPartition);
            if (timestamp <= max_timestamp) continue;
            max_timestamp = timestamp;
        }
        if (max_timestamp == Long.MIN_VALUE) {
            return -1L;
        }
        return max_timestamp;
    }

    private long getCommittedTimestampMillis(TopicPartition topicPartition) throws Exception {
        Message message = this.mKafkaClient.getCommittedMessage(topicPartition);
        if (message == null) {
            LOG.error("No message found for topic " + topicPartition.getTopic() + " partition " + topicPartition.getPartition());
            return -1L;
        }
        return this.mThriftMessageParser.extractTimestampMillis(message);
    }

    private long getCommittedTimestampMillis(String topic) throws Exception {
        int numPartitions = this.mKafkaClient.getNumPartitions(topic);
        long minTimestamp = Long.MAX_VALUE;
        for (int partition = 0; partition < numPartitions; ++partition) {
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            long timestamp = this.getCommittedTimestampMillis(topicPartition);
            if (timestamp == -1L) {
                return -1L;
            }
            if (timestamp >= minTimestamp) continue;
            minTimestamp = timestamp;
        }
        if (minTimestamp == Long.MAX_VALUE) {
            return -1L;
        }
        return minTimestamp;
    }

    private NavigableSet<Calendar> getPartitions(String topic) throws IOException, ParseException {
        String s3Prefix = "s3n://" + this.mConfig.getS3Bucket() + "/" + this.mConfig.getS3Path();
        String[] partitions = new String[]{"dt="};
        LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, partitions, this.mConfig.getGeneration(), 0, 0L, this.mFileExtension);
        String parentDir = logFilePath.getLogFileParentDir();
        String[] partitionDirs = FileUtil.list(parentDir);
        Pattern pattern = Pattern.compile(".*/dt=(\\d\\d\\d\\d-\\d\\d-\\d\\d)$");
        TreeSet<Calendar> result = new TreeSet<Calendar>();
        for (String partitionDir : partitionDirs) {
            Matcher matcher = pattern.matcher(partitionDir);
            if (!matcher.find()) continue;
            String date = matcher.group(1);
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
            format.setTimeZone(TimeZone.getTimeZone("UTC"));
            Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
            calendar.setTime(format.parse(date));
            result.add(calendar);
        }
        return result;
    }

    private void finalizePartitionsUpTo(String topic, Calendar calendar) throws IOException, ParseException, InterruptedException {
        NavigableSet<Calendar> partitionDates = this.getPartitions(topic).headSet(calendar, true).descendingSet();
        String s3Prefix = "s3n://" + this.mConfig.getS3Bucket() + "/" + this.mConfig.getS3Path();
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
        format.setTimeZone(TimeZone.getTimeZone("UTC"));
        for (Calendar partition : partitionDates) {
            String partitionStr = format.format(partition.getTime());
            String[] partitions = new String[]{"dt=" + partitionStr};
            LogFilePath logFilePath = new LogFilePath(s3Prefix, topic, partitions, this.mConfig.getGeneration(), 0, 0L, this.mFileExtension);
            String logFileDir = logFilePath.getLogFileDir();
            assert (FileUtil.exists(logFileDir)) : "FileUtil.exists(" + logFileDir + ")";
            String successFilePath = logFileDir + "/_SUCCESS";
            if (FileUtil.exists(successFilePath)) {
                return;
            }
            try {
                this.mQuboleClient.addPartition(topic, "dt='" + partitionStr + "'");
            }
            catch (Exception e) {
                LOG.error("failed to finalize topic " + topic + " partition dt=" + partitionStr, (Throwable)e);
                continue;
            }
            LOG.info("touching file " + successFilePath);
            FileUtil.touch(successFilePath);
        }
    }

    private long getFinalizedTimestampMillis(TopicPartition topicPartition) throws Exception {
        long lastTimestamp = this.getLastTimestampMillis(topicPartition);
        long committedTimestamp = this.getCommittedTimestampMillis(topicPartition);
        long now = System.currentTimeMillis();
        if (lastTimestamp == committedTimestamp && now - lastTimestamp > 3600000L) {
            return now;
        }
        return committedTimestamp;
    }

    private long getFinalizedTimestampMillis(String topic) throws Exception {
        int numPartitions = this.mKafkaClient.getNumPartitions(topic);
        long minTimestamp = Long.MAX_VALUE;
        for (int partition = 0; partition < numPartitions; ++partition) {
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            long timestamp = this.getFinalizedTimestampMillis(topicPartition);
            LOG.info("finalized timestamp for topic " + topic + " partition " + partition + " is " + timestamp);
            if (timestamp == -1L) {
                return -1L;
            }
            if (timestamp >= minTimestamp) continue;
            minTimestamp = timestamp;
        }
        if (minTimestamp == Long.MAX_VALUE) {
            return -1L;
        }
        return minTimestamp;
    }

    public void finalizePartitions() throws Exception {
        List<String> topics = this.mZookeeperConnector.getCommittedOffsetTopics();
        for (String topic : topics) {
            if (!topic.matches(this.mConfig.getKafkaTopicFilter())) {
                LOG.info("skipping topic " + topic);
                continue;
            }
            LOG.info("finalizing topic " + topic);
            long finalizedTimestampMillis = this.getFinalizedTimestampMillis(topic);
            LOG.info("finalized timestamp for topic " + topic + " is " + finalizedTimestampMillis);
            if (finalizedTimestampMillis == -1L) continue;
            Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
            calendar.setTimeInMillis(finalizedTimestampMillis);
            calendar.add(10, -1);
            calendar.add(5, -1);
            this.finalizePartitionsUpTo(topic, calendar);
        }
    }
}

