/*
 * Decompiled with CFR 0.152.
 */
package flipkart.cp.convert.ha.worker.distribution;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import flipkart.cp.convert.ha.worker.distribution.Restartable;
import flipkart.cp.convert.ha.worker.exception.ErrorCode;
import flipkart.cp.convert.ha.worker.exception.WorkerException;
import flipkart.cp.convert.ha.worker.task.TaskList;
import java.io.Closeable;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributionManager
implements Closeable,
TreeCacheListener {
    private static final String PREFIX_PATH = "/ha-nodes";
    private final PersistentEphemeralNode myNode;
    private final String zkPrefix;
    private TaskList taskList;
    private final String instanceId;
    private final CuratorFramework client;
    private List<String> tasksToRun;
    private List<String> workerInstances;
    private Restartable restartable;
    private MetricRegistry metricRegistry;
    private static Logger log = LoggerFactory.getLogger((String)DistributionManager.class.getName());

    public DistributionManager(CuratorFramework client, TaskList taskList, String listenerPath, String instanceId, MetricRegistry metricRegistry) throws WorkerException {
        this.taskList = taskList;
        this.client = client;
        this.instanceId = instanceId;
        this.metricRegistry = metricRegistry;
        this.zkPrefix = "/ha-nodes/" + listenerPath;
        this.myNode = new PersistentEphemeralNode(client, PersistentEphemeralNode.Mode.EPHEMERAL, this.zkPrefix + "/" + this.instanceId, this.instanceId.getBytes());
        this.myNode.start();
        try {
            this.myNode.waitForInitialCreate(3L, TimeUnit.SECONDS);
        }
        catch (InterruptedException ex) {
            log.error("Exception occurred :" + ex.fillInStackTrace());
            throw new WorkerException(ex, ErrorCode.WORKER_RUNTIME_ERROR);
        }
        log.info("Ephemral node created " + instanceId);
        this._attachListener();
        metricRegistry.register(MetricRegistry.name(DistributionManager.class, (String[])new String[]{listenerPath, "WorkerInstances", "count"}), (Metric)new Gauge<Integer>(){

            public Integer getValue() {
                return DistributionManager.this.workerInstances.size();
            }
        });
    }

    private void _attachListener() throws WorkerException {
        try {
            TreeCache treeCache = new TreeCache(this.client, this.zkPrefix);
            treeCache.getListenable().addListener((Object)this);
            treeCache.start();
            log.info("Listener attached for " + this.instanceId);
        }
        catch (Exception ex) {
            log.error("Exception occurred :" + ex.fillInStackTrace());
            throw new WorkerException(ex, ErrorCode.WORKER_RUNTIME_ERROR);
        }
    }

    public void init() throws WorkerException {
        this.workerInstances = this.getWorkerInstances(this.zkPrefix);
        int instanceIndex = -1;
        for (int i = 0; i < this.workerInstances.size(); ++i) {
            String instanceName = this.workerInstances.get(i);
            if (!instanceName.equals(this.instanceId)) continue;
            instanceIndex = i;
            break;
        }
        if (instanceIndex < 0) {
            throw new IllegalStateException("'" + this.instanceId + "' instanceId is unknown & not configured!");
        }
        this.tasksToRun = DistributionManager._createTaskIdsForExecution(this.workerInstances, this.taskList, instanceIndex);
        log.info("Init over for " + this.instanceId);
    }

    private List<String> getWorkerInstances(String prefixPath) throws WorkerException {
        try {
            List workerInstances = (List)this.client.getChildren().forPath(prefixPath);
            log.info("Worker instances for " + prefixPath + "are " + workerInstances);
            return workerInstances;
        }
        catch (Exception ex) {
            log.error("Exception occurred :" + ex.fillInStackTrace());
            throw new WorkerException(ex, ErrorCode.WORKER_RUNTIME_ERROR);
        }
    }

    public int getTaskCount() {
        return this.tasksToRun.size();
    }

    public List<String> getTasksToRun() {
        return this.tasksToRun;
    }

    public void setRestartable(Restartable restartable) {
        this.restartable = restartable;
    }

    public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
        log.info("Event raised for " + this.instanceId + "Type -" + treeCacheEvent.getType());
        List paths = (List)curatorFramework.getChildren().forPath(this.zkPrefix);
        LinkedList<String> values = new LinkedList<String>();
        for (String path : paths) {
            String instanceIdVal = DistributionManager.getInstanceIdFromPath(path, this.zkPrefix);
            values.add(instanceIdVal);
        }
        if (this.hasHostListChanged(values)) {
            this.restartable.restart();
        }
    }

    private boolean hasHostListChanged(List<String> values) {
        String oldHash;
        String newHash = DistributionManager.createHash(values);
        return !newHash.equals(oldHash = DistributionManager.createHash(this.workerInstances));
    }

    @Override
    public void close() {
    }

    private static String getInstanceIdFromPath(String path, String prefixPath) {
        return path.replaceFirst(prefixPath + "/", "");
    }

    private static String createHash(List<String> strings) {
        StringBuilder sb = new StringBuilder();
        for (String s : strings) {
            sb.append(s);
            sb.append(":");
        }
        return sb.toString();
    }

    private static List<String> _createTaskIdsForExecution(List<String> workerInstances, TaskList taskList, int instanceIndex) {
        LinkedList<String> instances = new LinkedList<String>();
        List<String> taskNames = taskList.getTaskNames();
        for (int i = 0; i < taskNames.size(); ++i) {
            if (i % workerInstances.size() != instanceIndex) continue;
            instances.add(taskNames.get(i));
        }
        log.info("TaskId's for Execution generated by " + instanceIndex + "is " + instances);
        return instances;
    }
}

