/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.pub;

import com.linkedin.databus.client.pub.CheckpointPersistenceProviderAbstract;
import com.linkedin.databus.client.pub.RegistrationId;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.data_model.DatabusSubscription;
import com.linkedin.databus.core.util.ConfigApplier;
import com.linkedin.databus.core.util.ConfigBuilder;
import com.linkedin.databus.core.util.ConfigManager;
import com.linkedin.databus.core.util.InvalidConfigException;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Formatter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonProcessingException;

public class FileSystemCheckpointPersistenceProvider
extends CheckpointPersistenceProviderAbstract {
    public static final String MODULE = FileSystemCheckpointPersistenceProvider.class.getName();
    public static final Logger LOG = Logger.getLogger((String)MODULE);
    private static final int MAX_STREAMID_NAME_LENGTH = 240;
    private static final String COMMON_SOURCENAME_PREFIX = "com.linkedin.events.";
    private final StaticConfig _staticConfig;
    private final ConfigManager<RuntimeConfig> _configManager;
    private final HashMap<String, CacheEntry> _cache;
    private final ReadWriteLock _cacheLock = new ReentrantReadWriteLock(true);

    public FileSystemCheckpointPersistenceProvider() throws InvalidConfigException {
        this(new Config(), 2);
    }

    public FileSystemCheckpointPersistenceProvider(Config config, int protocolVersion) throws InvalidConfigException {
        this(config.build(), protocolVersion);
    }

    public FileSystemCheckpointPersistenceProvider(StaticConfig config, int protocolVersion) throws InvalidConfigException {
        super(protocolVersion);
        this._staticConfig = config;
        this._cache = new HashMap(100);
        this._staticConfig.getRuntime().setManagedInstance(this);
        this._configManager = new ConfigManager(this._staticConfig.getRuntimeConfigPrefix(), (ConfigBuilder)this._staticConfig.getRuntime());
    }

    public StaticConfig getStaticConfig() {
        return this._staticConfig;
    }

    @Override
    public Checkpoint loadCheckpointV3(List<DatabusSubscription> subs, RegistrationId registrationId) {
        return this.loadCheckpointInternal(this.convertSubsToListOfStrings(subs), registrationId);
    }

    @Override
    public void storeCheckpointV3(List<DatabusSubscription> subs, Checkpoint checkpoint, RegistrationId registrationId) throws IOException {
        this.storeCheckpointInternal(this.convertSubsToListOfStrings(subs), checkpoint, registrationId);
    }

    @Override
    public void removeCheckpointV3(List<DatabusSubscription> subs, RegistrationId registrationId) {
        this.removeCheckpointInternal(this.convertSubsToListOfStrings(subs), registrationId);
    }

    @Override
    public Checkpoint loadCheckpoint(List<String> sourceNames) {
        return this.loadCheckpointInternal(sourceNames, null);
    }

    @Override
    public void storeCheckpoint(List<String> sourceNames, Checkpoint checkpoint) throws IOException {
        this.storeCheckpointInternal(sourceNames, checkpoint, null);
    }

    @Override
    public void removeCheckpoint(List<String> sourceNames) {
        this.removeCheckpointInternal(sourceNames, null);
    }

    private Checkpoint loadCheckpointInternal(List<String> sourceNames, RegistrationId registrationId) {
        String streamId = FileSystemCheckpointPersistenceProvider.calcStreamId(sourceNames);
        try {
            CacheEntry cacheEntry = this.doLoadCheckpoint(sourceNames, streamId, registrationId);
            Checkpoint result = null == cacheEntry ? null : cacheEntry.getCheckpoint();
            return result;
        }
        catch (IOException ioe) {
            LOG.error((Object)"Error loading checkpoint", (Throwable)ioe);
            return null;
        }
    }

    private void storeCheckpointInternal(List<String> sourceNames, Checkpoint checkpoint, RegistrationId registrationId) throws IOException {
        String streamId = FileSystemCheckpointPersistenceProvider.calcStreamId(sourceNames);
        CacheEntry cacheEntry = this.doLoadCheckpoint(sourceNames, streamId, registrationId);
        if (null == cacheEntry) {
            throw new IOException("Creation of checkpoint failed: " + streamId);
        }
        if (!cacheEntry.setCheckpoint(checkpoint)) {
            throw new IOException("Storing of checkpoint failed");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeCheckpointInternal(List<String> sourceNames, RegistrationId registrationId) {
        LOG.info((Object)("Removing checkpoint for:" + sourceNames));
        String streamId = FileSystemCheckpointPersistenceProvider.calcStreamId(sourceNames);
        String key = this.calculateIndexForCache(streamId, registrationId);
        Lock writeLock = this._cacheLock.writeLock();
        writeLock.lock();
        try {
            this._cache.remove(key);
            File rootDirectoryForRegistrationId = null == registrationId ? this.getStaticConfig().getRootDirectory() : new File(this.getStaticConfig().getRootDirectory(), registrationId.getId());
            File curCheckpointFile = new File(rootDirectoryForRegistrationId, streamId + ".current");
            if (curCheckpointFile.exists() && !curCheckpointFile.delete()) {
                LOG.error((Object)("checkpoint removal failed: " + sourceNames));
                LOG.error((Object)("could not delete file:" + curCheckpointFile.getAbsolutePath()));
            }
        }
        finally {
            writeLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    CacheEntry doLoadCheckpoint(List<String> sourceNames, String streamId, RegistrationId registrationId) throws IOException {
        Lock readLock = this._cacheLock.readLock();
        readLock.lock();
        CacheEntry cacheEntry = null;
        String key = this.calculateIndexForCache(streamId, registrationId);
        try {
            cacheEntry = this._cache.get(key);
        }
        finally {
            readLock.unlock();
        }
        if (null == cacheEntry) {
            Lock writeLock = this._cacheLock.writeLock();
            writeLock.lock();
            try {
                cacheEntry = new CacheEntry(streamId, registrationId);
                this._cache.put(key, cacheEntry);
            }
            finally {
                writeLock.unlock();
            }
        }
        return cacheEntry;
    }

    private String calculateIndexForCache(String streamId, RegistrationId registrationId) {
        String key = streamId;
        if (null != registrationId) {
            assert (!registrationId.getId().isEmpty());
            key = registrationId.getId() + streamId;
        }
        return key;
    }

    static String calcStreamId(List<String> sourceNames) {
        StringBuilder sb = new StringBuilder(1024);
        sb.append("cp_");
        boolean useShortNames = sourceNames.size() >= 5;
        boolean first = true;
        for (String sourceName : sourceNames) {
            if (!first) {
                sb.append('-');
            }
            first = false;
            String realSourceName = useShortNames && sourceName.startsWith(COMMON_SOURCENAME_PREFIX) ? sourceName.substring(COMMON_SOURCENAME_PREFIX.length()) : sourceName;
            sb.append(realSourceName.replaceAll("\\W", "_"));
        }
        if (sb.length() > 240) {
            sb.delete(240, sb.length());
        }
        return sb.toString();
    }

    public File generateCheckpointFile(String basename, RegistrationId registrationId, int index) {
        File rootDirectory = null == registrationId ? this._staticConfig.getRootDirectory() : new File(this._staticConfig.getRootDirectory(), registrationId.getId());
        return StaticConfig.generateCheckpointFile(rootDirectory, basename, index);
    }

    public ConfigManager<RuntimeConfig> getConfigManager() {
        return this._configManager;
    }

    public static class Config
    implements ConfigBuilder<StaticConfig> {
        private String _rootDirectory = "./databus2-checkpoints";
        private String _runtimeConfigPrefix = "databus.checkpointPersistence.fileSystem.";
        private RuntimeConfigBuilder _runtime = new RuntimeConfigBuilder();

        public String getRootDirectory() {
            return this._rootDirectory;
        }

        public void setRootDirectory(String rootDirectory) {
            this._rootDirectory = rootDirectory;
        }

        public RuntimeConfigBuilder getRuntime() {
            return this._runtime;
        }

        public void setRuntime(RuntimeConfigBuilder runtime) {
            this._runtime = runtime;
        }

        public String getRuntimeConfigPrefix() {
            return this._runtimeConfigPrefix;
        }

        public void setRuntimeConfigPrefix(String runtimeConfigPrefix) {
            this._runtimeConfigPrefix = runtimeConfigPrefix;
        }

        public StaticConfig build() throws InvalidConfigException {
            File rootDirectory = new File(this._rootDirectory);
            if (!rootDirectory.exists() && !rootDirectory.mkdirs()) {
                throw new InvalidConfigException("Invalid checkpoint directory:" + rootDirectory.getAbsolutePath());
            }
            LOG.info((Object)("Checkpoint directory:" + rootDirectory.getAbsolutePath()));
            return new StaticConfig(rootDirectory, this._runtime, this._runtimeConfigPrefix);
        }
    }

    public static class StaticConfig {
        private final File _rootDirectory;
        private final RuntimeConfigBuilder _runtime;
        private final String _runtimeConfigPrefix;

        public StaticConfig(File rootDirectory, RuntimeConfigBuilder runtime, String runtimeConfigPrefix) {
            this._rootDirectory = rootDirectory;
            this._runtime = runtime;
            this._runtimeConfigPrefix = runtimeConfigPrefix;
        }

        public File getRootDirectory() {
            return this._rootDirectory;
        }

        public RuntimeConfigBuilder getRuntime() {
            return this._runtime;
        }

        public String getRuntimeConfigPrefix() {
            return this._runtimeConfigPrefix;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static File generateCheckpointFile(File rootDir, String basename, int index) {
            Formatter fmt = new Formatter();
            try {
                fmt.format("%02d", index);
                File file = new File(rootDir, basename + fmt.toString());
                return file;
            }
            finally {
                fmt.close();
            }
        }

        public String toString() {
            return "StaticConfig [_rootDirectory=" + this._rootDirectory + ", _runtime=" + this._runtime + ", _runtimeConfigPrefix=" + this._runtimeConfigPrefix + "]";
        }
    }

    public static class RuntimeConfigBuilder
    implements ConfigBuilder<RuntimeConfig> {
        public static final int MAX_HISTORY_SIZE = 99;
        public static final String CHECKPOINT_FILE_EXT_PATTERN = "%02d";
        private boolean _historyEnabled = false;
        private int _historySize = 5;
        private FileSystemCheckpointPersistenceProvider _managedInstance = null;

        public boolean isHistoryEnabled() {
            return this._historyEnabled;
        }

        public void setHistoryEnabled(boolean historyEnabled) {
            this._historyEnabled = historyEnabled;
        }

        public int getHistorySize() {
            return this._historySize;
        }

        public void setHistorySize(int historySize) {
            this._historySize = historySize;
        }

        public FileSystemCheckpointPersistenceProvider getManagedInstance() {
            return this._managedInstance;
        }

        public void setManagedInstance(FileSystemCheckpointPersistenceProvider managedInstance) {
            this._managedInstance = managedInstance;
        }

        public RuntimeConfig build() throws InvalidConfigException {
            if (this._historySize <= 0 || this._historySize > 99) {
                throw new InvalidConfigException("Invalid history size:" + this._historySize);
            }
            if (null == this._managedInstance) {
                throw new InvalidConfigException("No associated managed instance for runtime config");
            }
            return new RuntimeConfig(this._historyEnabled, this._historySize);
        }

        public String toString() {
            return "RuntimeConfigBuilder [_historyEnabled=" + this._historyEnabled + ", _historySize=" + this._historySize + ", _managedInstance=" + this._managedInstance + "]";
        }
    }

    public static class RuntimeConfig
    implements ConfigApplier<RuntimeConfig> {
        private final boolean _historyEnabled;
        private final int _historySize;

        public RuntimeConfig(boolean historyEnabled, int historySize) {
            this._historyEnabled = historyEnabled;
            this._historySize = historySize;
        }

        public boolean isHistoryEnabled() {
            return this._historyEnabled;
        }

        public int getHistorySize() {
            return this._historySize;
        }

        public void applyNewConfig(RuntimeConfig oldConfig) {
        }

        public boolean equals(Object otherConfig) {
            if (null == otherConfig || !(otherConfig instanceof RuntimeConfig)) {
                return false;
            }
            return this.equalsConfig((RuntimeConfig)otherConfig);
        }

        public boolean equalsConfig(RuntimeConfig otherConfig) {
            if (null == otherConfig) {
                return false;
            }
            return this.getHistorySize() == otherConfig.getHistorySize() && this.isHistoryEnabled() == otherConfig.isHistoryEnabled();
        }

        public int hashCode() {
            return this._historySize ^ (this._historyEnabled ? -1 : 0);
        }

        public String toString() {
            return "RuntimeConfig [_historyEnabled=" + this._historyEnabled + ", _historySize=" + this._historySize + "]";
        }
    }

    class CacheEntry {
        private Checkpoint _checkpoint;
        private boolean _checkpointLoaded = false;
        private TreeSet<HistoryEntry> _historyEntries;
        private final String _filePrefix;
        private final RegistrationId _registrationId;
        private File _rootDirectory;

        public CacheEntry(String streamId, RegistrationId registrationId) throws IOException {
            boolean success;
            this._filePrefix = streamId + ".";
            this._registrationId = registrationId;
            this._rootDirectory = null == this._registrationId ? FileSystemCheckpointPersistenceProvider.this._staticConfig.getRootDirectory() : new File(FileSystemCheckpointPersistenceProvider.this._staticConfig.getRootDirectory(), this._registrationId.getId());
            boolean bl = success = this._rootDirectory.exists() || this._rootDirectory.mkdirs();
            if (!success) {
                LOG.error((Object)("Failed to create checkpoint directory (" + this._rootDirectory + ")"));
            }
        }

        public synchronized Checkpoint getCheckpoint() {
            if (!this._checkpointLoaded) {
                this.loadCurrentCheckpoint();
                this._checkpointLoaded = true;
            }
            return this._checkpoint;
        }

        private boolean loadCurrentCheckpoint() {
            boolean hasError = false;
            File cpFile = new File(this._rootDirectory, this._filePrefix + "current");
            if (cpFile.exists()) {
                try {
                    BufferedReader checkpointFile = new BufferedReader(new FileReader(cpFile));
                    String jsonLine = checkpointFile.readLine();
                    if (null == jsonLine) {
                        LOG.error((Object)"Checkpoint JSON serialization expected");
                        hasError = true;
                    }
                    Checkpoint newCheckpoint = null;
                    if (!hasError) {
                        try {
                            newCheckpoint = new Checkpoint(jsonLine);
                            if (LOG.isDebugEnabled()) {
                                LOG.debug((Object)("checkpoint loaded:" + newCheckpoint.toString()));
                            }
                        }
                        catch (JsonProcessingException jpe) {
                            hasError = true;
                            LOG.error((Object)"Unable to deserialize checkpoint", (Throwable)jpe);
                        }
                    }
                    if (!hasError) {
                        this._checkpoint = newCheckpoint;
                    }
                    checkpointFile.close();
                }
                catch (IOException ioe) {
                    LOG.error((Object)ioe);
                }
            }
            return !hasError;
        }

        public synchronized boolean setCheckpoint(Checkpoint checkpoint) {
            if (null == checkpoint) {
                LOG.error((Object)"Cannot save null checkpoints");
                return false;
            }
            boolean hasError = false;
            RuntimeConfig runtimeConfig = (RuntimeConfig)FileSystemCheckpointPersistenceProvider.this.getConfigManager().getReadOnlyConfig();
            if (runtimeConfig.isHistoryEnabled()) {
                this.findHistoryFiles();
            }
            File newCheckpointFile = new File(this._rootDirectory, this._filePrefix + "newcurrent");
            File curCheckpointFile = new File(this._rootDirectory, this._filePrefix + "current");
            boolean hasCurCheckpointFile = curCheckpointFile.exists();
            boolean bl = hasError = hasError || !this.storeCheckpoint(checkpoint, newCheckpointFile);
            if (!hasError) {
                if (hasCurCheckpointFile && runtimeConfig.isHistoryEnabled()) {
                    this.addHistoryEntry(curCheckpointFile);
                } else {
                    File oldCheckpointFile = new File(this._rootDirectory, this._filePrefix + "oldcurrent");
                    if (oldCheckpointFile.exists() && !oldCheckpointFile.delete()) {
                        LOG.error((Object)("removing old checkpoint file failed:" + oldCheckpointFile.getAbsolutePath()));
                    }
                    if (hasCurCheckpointFile && !curCheckpointFile.renameTo(oldCheckpointFile)) {
                        LOG.error((Object)("saving old checkpoint file failed: " + oldCheckpointFile.getAbsolutePath()));
                    }
                    if (hasCurCheckpointFile && curCheckpointFile.exists() && !curCheckpointFile.delete()) {
                        LOG.warn((Object)("deletion of checkpoint file failed:" + curCheckpointFile.getAbsolutePath()));
                    }
                }
                if (!newCheckpointFile.renameTo(curCheckpointFile)) {
                    hasError = true;
                    LOG.error((Object)"Saving current checkpoint failed");
                }
            }
            if (!hasError) {
                this._checkpoint = checkpoint;
            }
            return !hasError;
        }

        private void findHistoryFiles() {
            this._historyEntries = new TreeSet();
            File[] historyFiles = FileSystemCheckpointPersistenceProvider.this.getStaticConfig().getRootDirectory().listFiles(new FilenameFilter(){

                @Override
                public boolean accept(File dir, String name) {
                    return name.startsWith(CacheEntry.this._filePrefix);
                }
            });
            if (null == historyFiles) {
                LOG.warn((Object)("Unable to find history files " + this._filePrefix + "*"));
            } else {
                for (File f : historyFiles) {
                    HistoryEntry entry = this.createHistoryEntry(f);
                    if (null == entry) continue;
                    this._historyEntries.add(entry);
                }
            }
        }

        private boolean storeCheckpoint(Checkpoint checkpoint, File toFile) {
            boolean hasError = false;
            if (toFile.exists() && !toFile.delete()) {
                LOG.error((Object)("deletion of file failed: " + toFile.getAbsolutePath()));
            }
            try {
                PrintWriter out = new PrintWriter(toFile);
                String checkpointJson = checkpoint.toString();
                out.println(checkpointJson);
                out.close();
            }
            catch (Exception e) {
                hasError = true;
                LOG.error((Object)"Store checkpoint error", (Throwable)e);
            }
            return !hasError;
        }

        private void cleanupHistoryFiles() {
            RuntimeConfig runtimeConfig = (RuntimeConfig)FileSystemCheckpointPersistenceProvider.this.getConfigManager().getReadOnlyConfig();
            int maxSize = runtimeConfig.getHistorySize();
            while (this._historyEntries.size() > maxSize) {
                this._historyEntries.remove(this._historyEntries.last());
            }
        }

        private HistoryEntry createHistoryEntry(File file) {
            int suffixIdx = file.getName().lastIndexOf(46);
            if (-1 == suffixIdx) {
                return null;
            }
            String suffix = file.getName().substring(suffixIdx + 1);
            int index = -1;
            try {
                index = Integer.parseInt(suffix);
                return index < 0 ? null : new HistoryEntry(index, file);
            }
            catch (NumberFormatException nfe) {
                return null;
            }
        }

        private void addHistoryEntry(File f) {
            HistoryEntry newEntry = new HistoryEntry(-1, f);
            this._historyEntries.add(newEntry);
            this.cleanupHistoryFiles();
            Iterator<HistoryEntry> descIter = this._historyEntries.descendingIterator();
            while (descIter.hasNext()) {
                HistoryEntry entry = descIter.next();
                entry.bumpUpVersion();
            }
        }

        class HistoryEntry
        implements Comparable<HistoryEntry> {
            private int _index;
            private File _file;

            public HistoryEntry(int index, File file) {
                this._index = index;
                this._file = file;
            }

            public void bumpUpVersion() {
                ++this._index;
                File newFile = FileSystemCheckpointPersistenceProvider.this.generateCheckpointFile(CacheEntry.this._filePrefix, CacheEntry.this._registrationId, this._index);
                if (newFile.exists() && !newFile.delete()) {
                    LOG.warn((Object)("failed to remove file:" + newFile.getAbsolutePath()));
                }
                if (!this._file.renameTo(newFile)) {
                    LOG.error((Object)("File rollover failed: from " + this._file.getAbsolutePath() + " to " + newFile.getAbsolutePath()));
                }
                this._file = newFile;
            }

            @Override
            public int compareTo(HistoryEntry other) {
                return this._index - other._index;
            }

            public boolean equals(Object other) {
                if (null == other || !(other instanceof HistoryEntry)) {
                    return false;
                }
                HistoryEntry otherEntry = (HistoryEntry)other;
                return this._index == otherEntry._index;
            }

            public int hashCode() {
                return this._index;
            }
        }
    }
}

