/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.uploader;

import com.pinterest.secor.common.FileRegistry;
import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.common.OffsetTracker;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.TopicPartition;
import com.pinterest.secor.common.ZookeeperConnector;
import com.pinterest.secor.io.FileReaderWriter;
import com.pinterest.secor.io.KeyValue;
import com.pinterest.secor.util.CompressionUtil;
import com.pinterest.secor.util.FileUtil;
import com.pinterest.secor.util.IdUtil;
import com.pinterest.secor.util.ReflectionUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Uploader {
    private static final Logger LOG = LoggerFactory.getLogger(Uploader.class);
    private static final ExecutorService executor = Executors.newFixedThreadPool(256);
    private SecorConfig mConfig;
    private OffsetTracker mOffsetTracker;
    private FileRegistry mFileRegistry;
    private ZookeeperConnector mZookeeperConnector;

    public Uploader(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry) {
        this(config, offsetTracker, fileRegistry, new ZookeeperConnector(config));
    }

    public Uploader(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry, ZookeeperConnector zookeeperConnector) {
        this.mConfig = config;
        this.mOffsetTracker = offsetTracker;
        this.mFileRegistry = fileRegistry;
        this.mZookeeperConnector = zookeeperConnector;
    }

    private Future<?> upload(LogFilePath localPath) throws Exception {
        String s3Prefix = "s3n://" + this.mConfig.getS3Bucket() + "/" + this.mConfig.getS3Path();
        LogFilePath s3Path = new LogFilePath(s3Prefix, localPath.getTopic(), localPath.getPartitions(), localPath.getGeneration(), localPath.getKafkaPartition(), localPath.getOffset(), localPath.getExtension());
        final String localLogFilename = localPath.getLogFilePath();
        final String s3LogFilename = s3Path.getLogFilePath();
        LOG.info("uploading file " + localLogFilename + " to " + s3LogFilename);
        return executor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    FileUtil.moveToS3(localLogFilename, s3LogFilename);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void uploadFiles(TopicPartition topicPartition) throws Exception {
        long committedOffsetCount = this.mOffsetTracker.getTrueCommittedOffsetCount(topicPartition);
        long lastSeenOffset = this.mOffsetTracker.getLastSeenOffset(topicPartition);
        String lockPath = "/secor/locks/" + topicPartition.getTopic() + "/" + topicPartition.getPartition();
        this.mFileRegistry.deleteWriters(topicPartition);
        this.mZookeeperConnector.lock(lockPath);
        try {
            long zookeeperComittedOffsetCount = this.mZookeeperConnector.getCommittedOffsetCount(topicPartition);
            if (zookeeperComittedOffsetCount == committedOffsetCount) {
                LOG.info("uploading topic " + topicPartition.getTopic() + " partition " + topicPartition.getPartition());
                Collection<LogFilePath> paths = this.mFileRegistry.getPaths(topicPartition);
                ArrayList uploadFutures = new ArrayList();
                for (LogFilePath logFilePath : paths) {
                    uploadFutures.add(this.upload(logFilePath));
                }
                for (Future future : uploadFutures) {
                    future.get();
                }
                this.mFileRegistry.deleteTopicPartition(topicPartition);
                this.mZookeeperConnector.setCommittedOffsetCount(topicPartition, lastSeenOffset + 1L);
                this.mOffsetTracker.setCommittedOffsetCount(topicPartition, lastSeenOffset + 1L);
            }
        }
        finally {
            this.mZookeeperConnector.unlock(lockPath);
        }
    }

    protected FileReaderWriter createReader(LogFilePath srcPath, CompressionCodec codec) throws Exception {
        return (FileReaderWriter)ReflectionUtil.createFileReaderWriter(this.mConfig.getFileReaderWriter(), srcPath, codec, FileReaderWriter.Type.Reader);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void trim(LogFilePath srcPath, long startOffset) throws Exception {
        if (startOffset == srcPath.getOffset()) {
            return;
        }
        FileReaderWriter reader = null;
        FileReaderWriter writer = null;
        LogFilePath dstPath = null;
        int copiedMessages = 0;
        this.mFileRegistry.deleteWriter(srcPath);
        try {
            KeyValue keyVal;
            CompressionCodec codec = null;
            String extension = "";
            if (this.mConfig.getCompressionCodec() != null && !this.mConfig.getCompressionCodec().isEmpty()) {
                codec = CompressionUtil.createCompressionCodec(this.mConfig.getCompressionCodec());
                extension = codec.getDefaultExtension();
            }
            reader = this.createReader(srcPath, codec);
            while ((keyVal = reader.next()) != null) {
                if (keyVal.getKey() < startOffset) continue;
                if (writer == null) {
                    String localPrefix = this.mConfig.getLocalPath() + '/' + IdUtil.getLocalMessageDir();
                    dstPath = new LogFilePath(localPrefix, srcPath.getTopic(), srcPath.getPartitions(), srcPath.getGeneration(), srcPath.getKafkaPartition(), startOffset, extension);
                    writer = this.mFileRegistry.getOrCreateWriter(dstPath, codec);
                }
                writer.write(keyVal);
                ++copiedMessages;
            }
        }
        finally {
            if (reader != null) {
                reader.close();
            }
        }
        this.mFileRegistry.deletePath(srcPath);
        if (dstPath == null) {
            LOG.info("removed file " + srcPath.getLogFilePath());
        } else {
            LOG.info("trimmed " + copiedMessages + " messages from " + srcPath.getLogFilePath() + " to " + dstPath.getLogFilePath() + " with start offset " + startOffset);
        }
    }

    private void trimFiles(TopicPartition topicPartition, long startOffset) throws Exception {
        Collection<LogFilePath> paths = this.mFileRegistry.getPaths(topicPartition);
        for (LogFilePath path : paths) {
            this.trim(path, startOffset);
        }
    }

    private void checkTopicPartition(TopicPartition topicPartition) throws Exception {
        long size = this.mFileRegistry.getSize(topicPartition);
        long modificationAgeSec = this.mFileRegistry.getModificationAgeSec(topicPartition);
        if (size >= this.mConfig.getMaxFileSizeBytes() || modificationAgeSec >= this.mConfig.getMaxFileAgeSeconds()) {
            long newOffsetCount = this.mZookeeperConnector.getCommittedOffsetCount(topicPartition);
            long oldOffsetCount = this.mOffsetTracker.setCommittedOffsetCount(topicPartition, newOffsetCount);
            long lastSeenOffset = this.mOffsetTracker.getLastSeenOffset(topicPartition);
            if (oldOffsetCount == newOffsetCount) {
                this.uploadFiles(topicPartition);
            } else if (newOffsetCount > lastSeenOffset) {
                LOG.debug("last seen offset " + lastSeenOffset + " is lower than committed offset count " + newOffsetCount + ".  Deleting files in topic " + topicPartition.getTopic() + " partition " + topicPartition.getPartition());
                this.mFileRegistry.deleteTopicPartition(topicPartition);
            } else {
                LOG.debug("previous committed offset count " + oldOffsetCount + " is lower than committed offset " + newOffsetCount + " is lower than or equal to last seen offset " + lastSeenOffset + ".  Trimming files in topic " + topicPartition.getTopic() + " partition " + topicPartition.getPartition());
                this.trimFiles(topicPartition, newOffsetCount);
            }
        }
    }

    public void applyPolicy() throws Exception {
        Collection<TopicPartition> topicPartitions = this.mFileRegistry.getTopicPartitions();
        for (TopicPartition topicPartition : topicPartitions) {
            this.checkTopicPartition(topicPartition);
        }
    }
}

