/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.uploader;

import com.google.common.base.Joiner;
import com.pinterest.secor.common.FileRegistry;
import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.common.OffsetTracker;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.TopicPartition;
import com.pinterest.secor.common.ZookeeperConnector;
import com.pinterest.secor.io.FileReader;
import com.pinterest.secor.io.FileWriter;
import com.pinterest.secor.io.KeyValue;
import com.pinterest.secor.uploader.Handle;
import com.pinterest.secor.uploader.UploadManager;
import com.pinterest.secor.util.CompressionUtil;
import com.pinterest.secor.util.IdUtil;
import com.pinterest.secor.util.ReflectionUtil;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Uploader {
    private static final Logger LOG = LoggerFactory.getLogger(Uploader.class);
    private SecorConfig mConfig;
    private OffsetTracker mOffsetTracker;
    private FileRegistry mFileRegistry;
    private ZookeeperConnector mZookeeperConnector;
    private UploadManager mUploadManager;

    public Uploader(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry, UploadManager uploadManager) {
        this(config, offsetTracker, fileRegistry, uploadManager, new ZookeeperConnector(config));
    }

    public Uploader(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry, UploadManager uploadManager, ZookeeperConnector zookeeperConnector) {
        this.mConfig = config;
        this.mOffsetTracker = offsetTracker;
        this.mFileRegistry = fileRegistry;
        this.mUploadManager = uploadManager;
        this.mZookeeperConnector = zookeeperConnector;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void uploadFiles(TopicPartition topicPartition) throws Exception {
        long committedOffsetCount = this.mOffsetTracker.getTrueCommittedOffsetCount(topicPartition);
        long lastSeenOffset = this.mOffsetTracker.getLastSeenOffset(topicPartition);
        String stripped = StringUtils.strip((String)this.mConfig.getZookeeperPath(), (String)"/");
        String lockPath = Joiner.on((String)"/").skipNulls().join((Object)"", (Object)(stripped.isEmpty() ? null : stripped), new Object[]{"secor", "locks", topicPartition.getTopic(), topicPartition.getPartition()});
        this.mZookeeperConnector.lock(lockPath);
        try {
            long zookeeperComittedOffsetCount = this.mZookeeperConnector.getCommittedOffsetCount(topicPartition);
            if (zookeeperComittedOffsetCount == committedOffsetCount) {
                LOG.info("uploading topic {} partition {}", (Object)topicPartition.getTopic(), (Object)topicPartition.getPartition());
                this.mFileRegistry.deleteWriters(topicPartition);
                Collection<LogFilePath> paths = this.mFileRegistry.getPaths(topicPartition);
                ArrayList uploadHandles = new ArrayList();
                for (LogFilePath logFilePath : paths) {
                    uploadHandles.add(this.mUploadManager.upload(logFilePath));
                }
                for (Handle handle : uploadHandles) {
                    handle.get();
                }
                this.mFileRegistry.deleteTopicPartition(topicPartition);
                this.mZookeeperConnector.setCommittedOffsetCount(topicPartition, lastSeenOffset + 1L);
                this.mOffsetTracker.setCommittedOffsetCount(topicPartition, lastSeenOffset + 1L);
            }
        }
        finally {
            this.mZookeeperConnector.unlock(lockPath);
        }
    }

    protected FileReader createReader(LogFilePath srcPath, CompressionCodec codec) throws Exception {
        return ReflectionUtil.createFileReader(this.mConfig.getFileReaderWriterFactory(), srcPath, codec);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void trim(LogFilePath srcPath, long startOffset) throws Exception {
        if (startOffset == srcPath.getOffset()) {
            return;
        }
        FileReader reader = null;
        FileWriter writer = null;
        LogFilePath dstPath = null;
        int copiedMessages = 0;
        this.mFileRegistry.deleteWriter(srcPath);
        try {
            KeyValue keyVal;
            CompressionCodec codec = null;
            String extension = "";
            if (this.mConfig.getCompressionCodec() != null && !this.mConfig.getCompressionCodec().isEmpty()) {
                codec = CompressionUtil.createCompressionCodec(this.mConfig.getCompressionCodec());
                extension = codec.getDefaultExtension();
            }
            reader = this.createReader(srcPath, codec);
            while ((keyVal = reader.next()) != null) {
                if (keyVal.getKey() < startOffset) continue;
                if (writer == null) {
                    String localPrefix = this.mConfig.getLocalPath() + '/' + IdUtil.getLocalMessageDir();
                    dstPath = new LogFilePath(localPrefix, srcPath.getTopic(), srcPath.getPartitions(), srcPath.getGeneration(), srcPath.getKafkaPartition(), startOffset, extension);
                    writer = this.mFileRegistry.getOrCreateWriter(dstPath, codec);
                }
                writer.write(keyVal);
                ++copiedMessages;
            }
        }
        finally {
            if (reader != null) {
                reader.close();
            }
        }
        this.mFileRegistry.deletePath(srcPath);
        if (dstPath == null) {
            LOG.info("removed file {}", (Object)srcPath.getLogFilePath());
        } else {
            LOG.info("trimmed {} messages from {} to {} with start offset {}", new Object[]{copiedMessages, srcPath.getLogFilePath(), dstPath.getLogFilePath(), startOffset});
        }
    }

    private void trimFiles(TopicPartition topicPartition, long startOffset) throws Exception {
        Collection<LogFilePath> paths = this.mFileRegistry.getPaths(topicPartition);
        for (LogFilePath path : paths) {
            this.trim(path, startOffset);
        }
    }

    private void checkTopicPartition(TopicPartition topicPartition) throws Exception {
        long size = this.mFileRegistry.getSize(topicPartition);
        long modificationAgeSec = this.mFileRegistry.getModificationAgeSec(topicPartition);
        LOG.debug("size: " + size + " modificationAge: " + modificationAgeSec);
        if (size >= this.mConfig.getMaxFileSizeBytes() || modificationAgeSec >= this.mConfig.getMaxFileAgeSeconds()) {
            long newOffsetCount = this.mZookeeperConnector.getCommittedOffsetCount(topicPartition);
            long oldOffsetCount = this.mOffsetTracker.setCommittedOffsetCount(topicPartition, newOffsetCount);
            long lastSeenOffset = this.mOffsetTracker.getLastSeenOffset(topicPartition);
            if (oldOffsetCount == newOffsetCount) {
                LOG.debug("Uploading for: " + topicPartition);
                this.uploadFiles(topicPartition);
            } else if (newOffsetCount > lastSeenOffset) {
                LOG.debug("last seen offset {} is lower than committed offset count {}. Deleting files in topic {} partition {}", new Object[]{lastSeenOffset, newOffsetCount, topicPartition.getTopic(), topicPartition.getPartition()});
                this.mFileRegistry.deleteTopicPartition(topicPartition);
            } else {
                LOG.debug("previous committed offset count {} is lower than committed offset {} is lower than or equal to last seen offset {}. Trimming files in topic {} partition {}", new Object[]{oldOffsetCount, newOffsetCount, lastSeenOffset, topicPartition.getTopic(), topicPartition.getPartition()});
                this.trimFiles(topicPartition, newOffsetCount);
            }
        }
    }

    public void applyPolicy() throws Exception {
        Collection<TopicPartition> topicPartitions = this.mFileRegistry.getTopicPartitions();
        for (TopicPartition topicPartition : topicPartitions) {
            this.checkTopicPartition(topicPartition);
        }
    }
}

