/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.databus.client.pub;

import com.linkedin.databus.client.pub.CheckpointPersistenceProviderAbstract;
import com.linkedin.databus.core.Checkpoint;
import com.linkedin.databus.core.util.ConfigBuilder;
import com.linkedin.databus.core.util.InvalidConfigException;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;

public class ClusterCheckpointPersistenceProvider
extends CheckpointPersistenceProviderAbstract {
    protected static final Logger LOG = Logger.getLogger(ClusterCheckpointPersistenceProvider.class);
    protected static final String KEY_CHECKPOINT = "c";
    protected static final String KEY_SOURCES = "s";
    private final String _id;
    private HelixPropertyStore<ZNRecord> _propertyStore = null;
    private final long _checkpointIntervalMs;
    private long _numWritesSkipped = 0L;
    private long _lastTimeWrittenMs = 0L;
    private static final HelixConnectionManager _helixConnManager = new HelixConnectionManager();

    public ClusterCheckpointPersistenceProvider(long id) throws InvalidConfigException, ClusterCheckpointException {
        this(id, new Config());
    }

    public ClusterCheckpointPersistenceProvider(String id) throws InvalidConfigException, ClusterCheckpointException {
        this(id, new Config().build());
    }

    public ClusterCheckpointPersistenceProvider(long id, Config config) throws InvalidConfigException, ClusterCheckpointException {
        this(id, config.build());
    }

    public ClusterCheckpointPersistenceProvider(String id, Config config) throws InvalidConfigException, ClusterCheckpointException {
        this(id, config.build());
    }

    public ClusterCheckpointPersistenceProvider(long id, StaticConfig config) throws InvalidConfigException, ClusterCheckpointException {
        this(Long.toString(id), config);
    }

    public ClusterCheckpointPersistenceProvider(String id, StaticConfig config) throws ClusterCheckpointException {
        this._id = id;
        this._checkpointIntervalMs = config.getCheckpointIntervalMs();
        try {
            HelixManager manager = _helixConnManager.open(config.getClusterName(), config.getZkAddr(), id);
            this._propertyStore = manager.getHelixPropertyStore();
        }
        catch (Exception e) {
            LOG.error((Object)("Error creating Helix Manager! for cluster=" + config.getClusterName() + " id=" + this._id + " exception=" + e));
            throw new ClusterCheckpointException(e.toString());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static boolean createCluster(String zkAddr, String clusterName) {
        boolean created = false;
        ZkClient zkClient = null;
        try {
            zkClient = new ZkClient(zkAddr, 30000, 60000, (ZkSerializer)new ZNRecordSerializer());
            ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
            admin.addCluster(clusterName, false);
            created = true;
        }
        catch (HelixException e) {
            LOG.warn((Object)("Warn! Cluster might already exist! " + clusterName));
            created = false;
        }
        finally {
            if (zkClient != null) {
                zkClient.close();
            }
        }
        return created;
    }

    public static void close(String clusterName) {
        if (_helixConnManager != null) {
            _helixConnManager.close(clusterName);
        }
    }

    protected String makeKey(List<String> srcs) {
        StringBuilder k = new StringBuilder(50);
        k.append("/");
        k.append(this._id);
        for (String s : srcs) {
            k.append("_");
            String[] list = s.split("\\.");
            k.append(list[list.length - 1]);
        }
        return k.toString();
    }

    protected String makeKeyOld(List<String> srcs) {
        StringBuilder k = new StringBuilder(50);
        k.append(this._id);
        for (String s : srcs) {
            k.append("_");
            k.append(s);
        }
        return k.toString();
    }

    @Deprecated
    public void storeCheckpointLegacy(List<String> sourceNames, Checkpoint checkpoint) throws IOException {
        if (this._propertyStore != null) {
            long curtimeMs = System.currentTimeMillis();
            if (curtimeMs - this._lastTimeWrittenMs > this._checkpointIntervalMs) {
                String key = this.makeKeyOld(sourceNames);
                ZNRecord znRecord = new ZNRecord(this._id);
                znRecord.setSimpleField(KEY_CHECKPOINT, checkpoint.toString());
                this._propertyStore.set(key, (Object)znRecord, AccessOption.PERSISTENT);
                this._lastTimeWrittenMs = curtimeMs;
                this._numWritesSkipped = 0L;
            } else {
                ++this._numWritesSkipped;
            }
        }
    }

    @Override
    public void storeCheckpoint(List<String> sourceNames, Checkpoint checkpoint) throws IOException {
        if (this._propertyStore != null) {
            long curtimeMs = System.currentTimeMillis();
            if (curtimeMs - this._lastTimeWrittenMs > this._checkpointIntervalMs) {
                this.storeZkRecord(sourceNames, checkpoint);
                this._lastTimeWrittenMs = curtimeMs;
                this._numWritesSkipped = 0L;
            } else {
                ++this._numWritesSkipped;
            }
        }
    }

    protected void storeZkRecord(List<String> sourceNames, Checkpoint checkpoint) {
        String key = this.makeKey(sourceNames);
        ZNRecord znRecord = new ZNRecord(this._id);
        znRecord.setSimpleField(KEY_CHECKPOINT, checkpoint.toString());
        znRecord.setSimpleField(KEY_SOURCES, StringUtils.join((Object[])sourceNames.toArray(), (String)","));
        this._propertyStore.set(key, (Object)znRecord, AccessOption.PERSISTENT);
    }

    @Deprecated
    public Checkpoint loadCheckpointLegacy(List<String> sources) {
        String key = this.makeKeyOld(sources);
        Checkpoint cp = this.getCheckpoint(key);
        return cp;
    }

    private Checkpoint getCheckpoint(String key) {
        ZNRecord zn = (ZNRecord)this._propertyStore.get(key, null, AccessOption.PERSISTENT);
        if (zn != null) {
            String v = zn.getSimpleField(KEY_CHECKPOINT);
            try {
                Checkpoint cp = new Checkpoint(v);
                return cp;
            }
            catch (JsonParseException e) {
                LOG.error((Object)("Cannot deserialize value for key=" + key + " value=" + v + " exception=" + (Object)((Object)e)));
            }
            catch (JsonMappingException e) {
                LOG.error((Object)("Cannot deserialize value for key=" + key + " value=" + v + " exception=" + (Object)((Object)e)));
            }
            catch (IOException e) {
                LOG.error((Object)("Cannot deserialize value for key=" + key + " value=" + v + " exception=" + e));
            }
        } else {
            LOG.error((Object)("No record for key = " + key));
        }
        return null;
    }

    @Override
    public Checkpoint loadCheckpoint(List<String> sourceNames) {
        if (this._propertyStore != null) {
            String key = this.makeKey(sourceNames);
            Checkpoint cp = this.getCheckpoint(key);
            return cp;
        }
        return null;
    }

    public Set<String> getSourceNames() {
        List keys;
        if (this._propertyStore != null && (keys = this._propertyStore.getChildNames("/", AccessOption.PERSISTENT)) != null) {
            HashSet<String> sources = new HashSet<String>();
            for (String k : keys) {
                String srcName;
                ZNRecord zn = (ZNRecord)this._propertyStore.get("/" + k, null, AccessOption.PERSISTENT);
                if (zn == null || (srcName = zn.getSimpleField(KEY_SOURCES)) == null) continue;
                sources.add(srcName);
            }
            return sources;
        }
        return null;
    }

    private void removeCheckpoint(String key) {
        this._propertyStore.remove(key, AccessOption.PERSISTENT);
    }

    public void removeCheckpointLegacy(List<String> sourceNames) {
        String keyOld = this.makeKeyOld(sourceNames);
        this.removeCheckpoint(keyOld);
    }

    @Override
    public void removeCheckpoint(List<String> sourceNames) {
        if (this._propertyStore != null) {
            String key = this.makeKey(sourceNames);
            this.removeCheckpoint(key);
        }
    }

    public long getNumWritesSkipped() {
        return this._numWritesSkipped;
    }

    public static class Config
    implements ConfigBuilder<StaticConfig> {
        private String _zkAddr = null;
        private String _clusterName = null;
        private long _checkpointIntervalMs = 300000L;
        private int _maxNumWritesSkipped = 0;

        public StaticConfig build() throws InvalidConfigException {
            if (this._zkAddr == null || this._clusterName == null) {
                throw new InvalidConfigException("zkAddr or clusterName cannot be unspecified ");
            }
            return new StaticConfig(this._zkAddr, this._clusterName, this._maxNumWritesSkipped, this._checkpointIntervalMs);
        }

        public String getZkAddr() {
            return this._zkAddr;
        }

        public void setZkAddr(String zkAddr) {
            this._zkAddr = zkAddr;
        }

        public String getClusterName() {
            return this._clusterName;
        }

        public void setClusterName(String clusterName) {
            this._clusterName = clusterName;
        }

        @Deprecated
        public int getMaxNumWritesSkipped() {
            return this._maxNumWritesSkipped;
        }

        @Deprecated
        public void setMaxNumWritesSkipped(int maxNumWritesSkipped) {
            this._maxNumWritesSkipped = maxNumWritesSkipped;
        }

        public void setCheckpointIntervalMs(long checkpointIntervalMs) {
            this._checkpointIntervalMs = checkpointIntervalMs;
        }

        public long getCheckpointIntervalMs() {
            return this._checkpointIntervalMs;
        }
    }

    public static class StaticConfig {
        private final String _zkAddr;
        private final String _clusterName;
        private final long _checkpointIntervalMs;
        private final int _maxNumWritesSkipped;

        public StaticConfig(String zkAddr, String clusterName, int numWritesSkipped, long checkpointIntervalMs) {
            this._zkAddr = zkAddr;
            this._clusterName = clusterName;
            this._maxNumWritesSkipped = numWritesSkipped;
            this._checkpointIntervalMs = checkpointIntervalMs;
        }

        public String getZkAddr() {
            return this._zkAddr;
        }

        public String getClusterName() {
            return this._clusterName;
        }

        @Deprecated
        public int getMaxNumWritesSkipped() {
            return this._maxNumWritesSkipped;
        }

        public long getCheckpointIntervalMs() {
            return this._checkpointIntervalMs;
        }
    }

    private static class HelixConnectionManager {
        private final Map<String, HelixManager> _managers = new HashMap<String, HelixManager>();

        public synchronized HelixManager open(String clusterName, String zkAddr, String id) throws Exception {
            HelixManager m = this._managers.get(clusterName);
            if (null == m) {
                m = HelixManagerFactory.getZKHelixManager((String)clusterName, (String)id, (InstanceType)InstanceType.SPECTATOR, (String)zkAddr);
                this._managers.put(clusterName, m);
                if (!m.isConnected()) {
                    m.connect();
                }
            }
            return m;
        }

        public synchronized void close(String cluster) {
            HelixManager m = this._managers.get(cluster);
            if (m != null) {
                this._managers.remove(cluster);
                m.disconnect();
            }
        }
    }

    public static class ClusterCheckpointException
    extends Exception {
        public ClusterCheckpointException(String msg) {
            super(msg);
        }
    }
}

