/*
 * Decompiled with CFR 0.152.
 */
package com.pinterest.secor.writer;

import com.pinterest.secor.common.FileRegistry;
import com.pinterest.secor.common.LogFilePath;
import com.pinterest.secor.common.OffsetTracker;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.TopicPartition;
import com.pinterest.secor.io.FileReaderWriter;
import com.pinterest.secor.io.KeyValue;
import com.pinterest.secor.message.Message;
import com.pinterest.secor.message.ParsedMessage;
import com.pinterest.secor.util.CompressionUtil;
import com.pinterest.secor.util.IdUtil;
import java.io.IOException;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageWriter {
    private static final Logger LOG = LoggerFactory.getLogger(MessageWriter.class);
    private SecorConfig mConfig;
    private OffsetTracker mOffsetTracker;
    private FileRegistry mFileRegistry;
    private String mFileExtension;
    private CompressionCodec mCodec;
    private String mLocalPrefix;

    public MessageWriter(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry) throws Exception {
        this.mConfig = config;
        this.mOffsetTracker = offsetTracker;
        this.mFileRegistry = fileRegistry;
        if (this.mConfig.getCompressionCodec() != null && !this.mConfig.getCompressionCodec().isEmpty()) {
            this.mCodec = CompressionUtil.createCompressionCodec(this.mConfig.getCompressionCodec());
            this.mFileExtension = this.mCodec.getDefaultExtension();
        } else {
            this.mFileExtension = "";
        }
        this.mLocalPrefix = this.mConfig.getLocalPath() + '/' + IdUtil.getLocalMessageDir();
    }

    public void adjustOffset(Message message) throws IOException {
        TopicPartition topicPartition = new TopicPartition(message.getTopic(), message.getKafkaPartition());
        long lastSeenOffset = this.mOffsetTracker.getLastSeenOffset(topicPartition);
        if (message.getOffset() != lastSeenOffset + 1L) {
            LOG.debug("offset of message " + message + " does not follow sequentially the last seen offset " + lastSeenOffset + ".  Deleting files in topic " + topicPartition.getTopic() + " partition " + topicPartition.getPartition());
            this.mFileRegistry.deleteTopicPartition(topicPartition);
        }
        this.mOffsetTracker.setLastSeenOffset(topicPartition, message.getOffset());
    }

    public void write(ParsedMessage message) throws Exception {
        TopicPartition topicPartition = new TopicPartition(message.getTopic(), message.getKafkaPartition());
        long offset = this.mOffsetTracker.getAdjustedCommittedOffsetCount(topicPartition);
        LogFilePath path = new LogFilePath(this.mLocalPrefix, this.mConfig.getGeneration(), offset, message, this.mFileExtension);
        FileReaderWriter writer = this.mFileRegistry.getOrCreateWriter(path, this.mCodec);
        writer.write(new KeyValue(message.getOffset(), message.getPayload()));
        LOG.debug("appended message " + message + " to file " + path.getLogFilePath() + ".  File length " + writer.getLength());
    }
}

