/*
 * Decompiled with CFR 0.152.
 */
package cascading.cascade;

import cascading.CascadingException;
import cascading.cascade.CascadeException;
import cascading.flow.Flow;
import cascading.flow.FlowException;
import cascading.flow.FlowSkipStrategy;
import cascading.stats.CascadeStats;
import cascading.util.Util;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.jgrapht.Graphs;
import org.jgrapht.graph.SimpleDirectedGraph;
import org.jgrapht.traverse.TopologicalOrderIterator;

public class Cascade
implements Runnable {
    private static final Logger LOG = Logger.getLogger(Cascade.class);
    private static Properties versionProperties;
    private String id;
    private String name;
    private final SimpleDirectedGraph<Flow, Integer> jobGraph;
    private final CascadeStats cascadeStats;
    private Thread thread;
    private Throwable throwable;
    private ExecutorService executor;
    private Map<String, Callable<Throwable>> jobsMap;
    private boolean stop;
    private FlowSkipStrategy flowSkipStrategy = null;

    Cascade(String name, SimpleDirectedGraph<Flow, Integer> jobGraph) {
        this.name = name;
        this.jobGraph = jobGraph;
        this.cascadeStats = new CascadeStats(this.getID());
        this.setIDOnFlow();
    }

    public String getName() {
        return this.name;
    }

    public String getID() {
        if (this.id == null) {
            this.id = Util.createUniqueID(this.getName());
        }
        return this.id;
    }

    public CascadeStats getCascadeStats() {
        return this.cascadeStats;
    }

    private void setIDOnFlow() {
        for (Flow flow : this.getFlows()) {
            flow.setProperty("cascading.cascade.id", this.getID());
        }
    }

    public List<Flow> getFlows() {
        ArrayList<Flow> flows = new ArrayList<Flow>();
        TopologicalOrderIterator<Flow, Integer> topoIterator = new TopologicalOrderIterator<Flow, Integer>(this.jobGraph);
        while (topoIterator.hasNext()) {
            flows.add((Flow)topoIterator.next());
        }
        return flows;
    }

    public FlowSkipStrategy getFlowSkipStrategy() {
        return this.flowSkipStrategy;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FlowSkipStrategy setFlowSkipStrategy(FlowSkipStrategy flowSkipStrategy) {
        try {
            FlowSkipStrategy flowSkipStrategy2 = this.flowSkipStrategy;
            return flowSkipStrategy2;
        }
        finally {
            this.flowSkipStrategy = flowSkipStrategy;
        }
    }

    public void start() {
        if (this.thread != null) {
            return;
        }
        this.thread = new Thread((Runnable)this, ("cascade " + Util.toNull(this.getName())).trim());
        this.thread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void complete() {
        this.start();
        try {
            try {
                this.thread.join();
            }
            catch (InterruptedException exception) {
                throw new FlowException("thread interrupted", exception);
            }
            if (this.throwable instanceof CascadingException) {
                throw (CascadingException)this.throwable;
            }
            if (this.throwable != null) {
                throw new CascadeException("unhandled exception", this.throwable);
            }
        }
        finally {
            this.thread = null;
            this.throwable = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Cascade.printBanner();
        if (LOG.isInfoEnabled()) {
            this.logInfo("starting");
        }
        try {
            this.cascadeStats.markRunning();
            this.initializeNewJobsMap();
            int numThreads = this.jobsMap.size();
            if (LOG.isInfoEnabled()) {
                this.logInfo(" starting flows: " + numThreads);
                this.logInfo(" allocating threads: " + numThreads);
            }
            this.executor = Executors.newFixedThreadPool(numThreads);
            List<Future<Throwable>> futures = this.executor.invokeAll(this.jobsMap.values());
            this.executor.shutdown();
            for (Future<Throwable> future : futures) {
                this.throwable = future.get();
                if (this.throwable == null) continue;
                this.cascadeStats.markFailed(this.throwable);
                if (!this.stop) {
                    this.internalStopAllFlows();
                }
                this.handleExecutorShutdown();
                break;
            }
        }
        catch (Throwable throwable) {
            this.throwable = throwable;
        }
        finally {
            if (!this.cascadeStats.isFinished()) {
                this.cascadeStats.markSuccessful();
            }
        }
    }

    private void initializeNewJobsMap() {
        this.jobsMap = new LinkedHashMap<String, Callable<Throwable>>();
        TopologicalOrderIterator<Flow, Integer> topoIterator = new TopologicalOrderIterator<Flow, Integer>(this.jobGraph);
        while (topoIterator.hasNext()) {
            Flow flow = (Flow)topoIterator.next();
            this.cascadeStats.addFlowStats(flow.getFlowStats());
            CascadeJob job = new CascadeJob(flow);
            this.jobsMap.put(flow.getName(), job);
            ArrayList<CascadeJob> predecessors = new ArrayList<CascadeJob>();
            for (Flow predecessor : Graphs.predecessorListOf(this.jobGraph, flow)) {
                predecessors.add((CascadeJob)this.jobsMap.get(predecessor.getName()));
            }
            job.init(predecessors);
        }
    }

    public synchronized void stop() {
        if (this.stop) {
            return;
        }
        this.stop = true;
        if (!this.cascadeStats.isFailed()) {
            this.cascadeStats.markStopped();
        }
        this.internalStopAllFlows();
        this.handleExecutorShutdown();
    }

    private void handleExecutorShutdown() {
        if (this.executor == null) {
            return;
        }
        this.logWarn("shutting down flow executor");
        try {
            this.executor.awaitTermination(300L, TimeUnit.SECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.logWarn("shutdown complete");
    }

    private void internalStopAllFlows() {
        this.logWarn("stopping flows");
        ArrayList<Callable<Throwable>> jobs = new ArrayList<Callable<Throwable>>(this.jobsMap.values());
        Collections.reverse(jobs);
        for (Callable callable : jobs) {
            ((CascadeJob)callable).stop();
        }
        this.logWarn("stopped flows");
    }

    public String toString() {
        return this.getName();
    }

    private void logInfo(String message) {
        LOG.info((Object)("[" + Util.truncate(this.getName(), 25) + "] " + message));
    }

    private void logWarn(String message) {
        this.logWarn(message, null);
    }

    private void logWarn(String message, Throwable throwable) {
        LOG.warn((Object)("[" + Util.truncate(this.getName(), 25) + "] " + message), throwable);
    }

    public static synchronized void printBanner() {
        if (versionProperties != null) {
            return;
        }
        versionProperties = new Properties();
        try {
            InputStream stream = Cascade.class.getClassLoader().getResourceAsStream("cascading/version.properties");
            if (stream == null) {
                return;
            }
            versionProperties.load(stream);
            stream = Cascade.class.getClassLoader().getResourceAsStream("cascading/build.number.properties");
            if (stream != null) {
                versionProperties.load(stream);
            }
            String releaseMajor = versionProperties.getProperty("cascading.release.major");
            String releaseMinor = versionProperties.getProperty("cascading.release.minor", null);
            String releaseBuild = versionProperties.getProperty("build.number", null);
            String hadoopVersion = versionProperties.getProperty("cascading.hadoop.compatible.version");
            String releaseFull = null;
            releaseFull = releaseMinor == null ? releaseMajor : String.format("%s.%s", releaseMajor, releaseMinor);
            String message = null;
            message = releaseBuild == null ? String.format("Concurrent, Inc - Cascading %s [%s]", releaseFull, hadoopVersion) : String.format("Concurrent, Inc - Cascading %s-%s [%s]", releaseFull, releaseBuild, hadoopVersion);
            LOG.info((Object)message);
        }
        catch (IOException exception) {
            LOG.warn((Object)"unable to load version information", (Throwable)exception);
        }
    }

    protected class CascadeJob
    implements Callable<Throwable> {
        Flow flow;
        private List<CascadeJob> predecessors;
        private CountDownLatch latch = new CountDownLatch(1);
        private boolean stop = false;
        private boolean failed = false;

        public CascadeJob(Flow flow) {
            this.flow = flow;
        }

        public String getName() {
            return this.flow.getName();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Throwable call() {
            Throwable throwable;
            try {
                Iterator<CascadeJob> i$;
                for (CascadeJob predecessor : this.predecessors) {
                    if (predecessor.isSuccessful()) continue;
                    Throwable throwable2 = null;
                    return throwable2;
                }
                if (this.stop) {
                    i$ = null;
                    return i$;
                }
                if (LOG.isInfoEnabled()) {
                    Cascade.this.logInfo("starting flow: " + this.flow.getName());
                }
                if (Cascade.this.flowSkipStrategy == null ? this.flow.isSkipFlow() : Cascade.this.flowSkipStrategy.skipFlow(this.flow)) {
                    if (LOG.isInfoEnabled()) {
                        Cascade.this.logInfo("skipping flow: " + this.flow.getName());
                    }
                    i$ = null;
                    return i$;
                }
                try {
                    this.flow.deleteSinksIfNotAppend();
                    this.flow.complete();
                    if (LOG.isInfoEnabled()) {
                        Cascade.this.logInfo("completed flow: " + this.flow.getName());
                    }
                }
                catch (Throwable exception) {
                    Cascade.this.logWarn("flow failed: " + this.flow.getName(), exception);
                    this.failed = true;
                    throwable = new CascadeException("flow failed: " + this.flow.getName(), exception);
                    return throwable;
                }
            }
            catch (Throwable throwable3) {
                this.failed = true;
                throwable = throwable3;
                return throwable;
            }
            finally {
                this.latch.countDown();
            }
            return null;
        }

        public void init(List<CascadeJob> predecessors) {
            this.predecessors = predecessors;
        }

        public void stop() {
            if (LOG.isInfoEnabled()) {
                Cascade.this.logInfo("stopping flow: " + this.flow.getName());
            }
            this.stop = true;
            if (this.flow != null) {
                this.flow.stop();
            }
        }

        public boolean isSuccessful() {
            try {
                this.latch.await();
                return this.flow != null && !this.failed;
            }
            catch (InterruptedException exception) {
                Cascade.this.logWarn("latch interrupted", exception);
                return false;
            }
        }
    }
}

