/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.tools;

import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.TopicPartition;
import com.pinterest.secor.io.FileReader;
import com.pinterest.secor.io.KeyValue;
import com.pinterest.secor.util.CompressionUtil;
import com.pinterest.secor.util.FileUtil;
import com.pinterest.secor.util.ReflectionUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.io.compress.CompressionCodec;

public class LogFileVerifier {
    private SecorConfig mConfig;
    private String mTopic;
    private HashMap<TopicPartition, SortedMap<Long, HashSet<LogFilePath>>> mTopicPartitionToOffsetToFiles;

    public LogFileVerifier(SecorConfig config, String topic) throws IOException {
        this.mConfig = config;
        this.mTopic = topic;
        this.mTopicPartitionToOffsetToFiles = new HashMap();
    }

    private String getPrefix() {
        return "s3n://" + this.mConfig.getS3Bucket() + "/" + this.mConfig.getS3Path();
    }

    private String getTopicPrefix() {
        return this.getPrefix() + "/" + this.mTopic;
    }

    private void populateTopicPartitionToOffsetToFiles() throws IOException {
        String[] paths;
        String prefix = this.getPrefix();
        String topicPrefix = this.getTopicPrefix();
        for (String path : paths = FileUtil.listRecursively(topicPrefix)) {
            long offset;
            HashSet<LogFilePath> logFilePaths;
            if (path.endsWith("/_SUCCESS")) continue;
            LogFilePath logFilePath = new LogFilePath(prefix, path);
            TopicPartition topicPartition = new TopicPartition(logFilePath.getTopic(), logFilePath.getKafkaPartition());
            SortedMap<Long, HashSet<LogFilePath>> offsetToFiles = this.mTopicPartitionToOffsetToFiles.get(topicPartition);
            if (offsetToFiles == null) {
                offsetToFiles = new TreeMap<Long, HashSet<LogFilePath>>();
                this.mTopicPartitionToOffsetToFiles.put(topicPartition, offsetToFiles);
            }
            if ((logFilePaths = (HashSet<LogFilePath>)offsetToFiles.get(offset = logFilePath.getOffset())) == null) {
                logFilePaths = new HashSet<LogFilePath>();
                offsetToFiles.put(offset, logFilePaths);
            }
            logFilePaths.add(logFilePath);
        }
    }

    private void filterOffsets(long fromOffset, long toOffset) {
        Iterator<Map.Entry<TopicPartition, SortedMap<Long, HashSet<LogFilePath>>>> iterator = this.mTopicPartitionToOffsetToFiles.entrySet().iterator();
        while (iterator.hasNext()) {
            long firstOffset = -2L;
            long lastOffset = Long.MAX_VALUE;
            Map.Entry<TopicPartition, SortedMap<Long, HashSet<LogFilePath>>> entry = iterator.next();
            SortedMap<Long, HashSet<LogFilePath>> offsetToFiles = entry.getValue();
            for (long offset : offsetToFiles.keySet()) {
                if (offset <= fromOffset || firstOffset == -2L) {
                    firstOffset = offset;
                }
                if (offset < toOffset || toOffset != Long.MAX_VALUE) continue;
                lastOffset = offset;
            }
            if (firstOffset == -2L) continue;
            TopicPartition topicPartition = entry.getKey();
            offsetToFiles = offsetToFiles.subMap(firstOffset, lastOffset);
            this.mTopicPartitionToOffsetToFiles.put(topicPartition, offsetToFiles);
        }
    }

    private int getMessageCount(LogFilePath logFilePath) throws Exception {
        FileReader reader = this.createFileReader(logFilePath);
        int result = 0;
        while (reader.next() != null) {
            ++result;
        }
        reader.close();
        return result;
    }

    public void verifyCounts(long fromOffset, long toOffset, int numMessages) throws Exception {
        this.populateTopicPartitionToOffsetToFiles();
        this.filterOffsets(fromOffset, toOffset);
        Iterator<Map.Entry<TopicPartition, SortedMap<Long, HashSet<LogFilePath>>>> iterator = this.mTopicPartitionToOffsetToFiles.entrySet().iterator();
        int aggregateMessageCount = 0;
        while (iterator.hasNext()) {
            long previousOffset = -2L;
            long previousMessageCount = -2L;
            Map.Entry<TopicPartition, SortedMap<Long, HashSet<LogFilePath>>> entry = iterator.next();
            SortedMap<Long, HashSet<LogFilePath>> offsetToFiles = entry.getValue();
            for (HashSet<LogFilePath> logFilePaths : offsetToFiles.values()) {
                int messageCount = 0;
                long offset = -2L;
                for (LogFilePath logFilePath : logFilePaths) {
                    assert (offset == -2L || offset == logFilePath.getOffset()) : Long.toString(offset) + " || " + offset + " == " + logFilePath.getOffset();
                    messageCount += this.getMessageCount(logFilePath);
                    offset = logFilePath.getOffset();
                }
                if (previousOffset != -2L && offset - previousOffset != previousMessageCount) {
                    TopicPartition topicPartition = entry.getKey();
                    throw new RuntimeException("Message count of " + previousMessageCount + " in topic " + topicPartition.getTopic() + " partition " + topicPartition.getPartition() + " does not agree with adjacent offsets " + previousOffset + " and " + offset);
                }
                previousOffset = offset;
                previousMessageCount = messageCount;
                aggregateMessageCount += messageCount;
            }
        }
        if (numMessages != -1 && aggregateMessageCount != numMessages) {
            throw new RuntimeException("Message count " + aggregateMessageCount + " does not agree with the expected count " + numMessages);
        }
    }

    private void getOffsets(LogFilePath logFilePath, Set<Long> offsets) throws Exception {
        KeyValue record;
        FileReader reader = this.createFileReader(logFilePath);
        while ((record = reader.next()) != null) {
            if (offsets.add(record.getKey())) continue;
            throw new RuntimeException("duplicate key " + record.getKey() + " found in file " + logFilePath.getLogFilePath());
        }
        reader.close();
    }

    public void verifySequences(long fromOffset, long toOffset) throws Exception {
        this.populateTopicPartitionToOffsetToFiles();
        this.filterOffsets(fromOffset, toOffset);
        Iterator<Map.Entry<TopicPartition, SortedMap<Long, HashSet<LogFilePath>>>> iterator = this.mTopicPartitionToOffsetToFiles.entrySet().iterator();
        while (iterator.hasNext()) {
            TreeSet<Long> offsets = new TreeSet<Long>();
            Map.Entry<TopicPartition, SortedMap<Long, HashSet<LogFilePath>>> entry = iterator.next();
            TopicPartition topicPartition = entry.getKey();
            SortedMap<Long, HashSet<LogFilePath>> offsetToFiles = entry.getValue();
            for (HashSet<LogFilePath> logFilePaths : offsetToFiles.values()) {
                for (LogFilePath logFilePath : logFilePaths) {
                    this.getOffsets(logFilePath, offsets);
                }
            }
            long lastOffset = -2L;
            for (Long offset : offsets) {
                if (lastOffset != -2L) assert (lastOffset + 1L == offset) : Long.toString(offset) + " + 1 == " + offset + " for topic " + topicPartition.getTopic() + " partition " + topicPartition.getPartition();
                lastOffset = offset;
            }
        }
    }

    private FileReader createFileReader(LogFilePath logFilePath) throws Exception {
        CompressionCodec codec = null;
        if (this.mConfig.getCompressionCodec() != null && !this.mConfig.getCompressionCodec().isEmpty()) {
            codec = CompressionUtil.createCompressionCodec(this.mConfig.getCompressionCodec());
        }
        FileReader fileReader = ReflectionUtil.createFileReader(this.mConfig.getFileReaderWriterFactory(), logFilePath, codec);
        return fileReader;
    }
}

