/*
 * Decompiled with CFR 0.152.
 */
package com.thinkaurelius.titan.graphdb.olap.computer;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.thinkaurelius.titan.core.TitanException;
import com.thinkaurelius.titan.core.TitanGraphComputer;
import com.thinkaurelius.titan.core.TitanGraphTransaction;
import com.thinkaurelius.titan.core.TitanTransaction;
import com.thinkaurelius.titan.core.TitanVertex;
import com.thinkaurelius.titan.core.schema.TitanManagement;
import com.thinkaurelius.titan.diskstorage.configuration.Configuration;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.scan.ScanJob;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.scan.ScanMetrics;
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.scan.StandardScanner;
import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
import com.thinkaurelius.titan.graphdb.database.StandardTitanGraph;
import com.thinkaurelius.titan.graphdb.olap.computer.FulgoraMapEmitter;
import com.thinkaurelius.titan.graphdb.olap.computer.FulgoraMemory;
import com.thinkaurelius.titan.graphdb.olap.computer.FulgoraReduceEmitter;
import com.thinkaurelius.titan.graphdb.olap.computer.FulgoraVertexMemory;
import com.thinkaurelius.titan.graphdb.olap.computer.PartitionedVertexProgramExecutor;
import com.thinkaurelius.titan.graphdb.olap.computer.VertexMapJob;
import com.thinkaurelius.titan.graphdb.olap.computer.VertexProgramScanJob;
import com.thinkaurelius.titan.graphdb.util.WorkerPool;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FulgoraGraphComputer
implements TitanGraphComputer {
    private static final Logger log = LoggerFactory.getLogger(FulgoraGraphComputer.class);
    public static final Set<String> NON_PERSISTING_KEYS = ImmutableSet.of((Object)"gremlin.traversal.sideEffects", (Object)"gremlin.traversalVertexProgram.haltedTraversers");
    private VertexProgram<?> vertexProgram;
    private final Set<MapReduce> mapReduces = new HashSet<MapReduce>();
    private final StandardTitanGraph graph;
    private int expectedNumVertices = 10000;
    private FulgoraMemory memory;
    private FulgoraVertexMemory vertexMemory;
    private boolean executed = false;
    private int numThreads = 1;
    private int readBatchSize = 10000;
    private int writeBatchSize;
    private GraphComputer.ResultGraph resultGraphMode = null;
    private GraphComputer.Persist persistMode = null;
    private static final AtomicInteger computerCounter = new AtomicInteger(0);
    private String name;
    private String jobId;

    public FulgoraGraphComputer(StandardTitanGraph graph, Configuration configuration) {
        this.graph = graph;
        this.writeBatchSize = configuration.get(GraphDatabaseConfiguration.BUFFER_SIZE, new String[0]);
        this.readBatchSize = this.writeBatchSize * 10;
        this.name = "compute" + computerCounter.incrementAndGet();
    }

    public GraphComputer result(GraphComputer.ResultGraph resultGraph) {
        Preconditions.checkArgument((resultGraph != null ? 1 : 0) != 0, (Object)"Need to specify mode");
        this.resultGraphMode = resultGraph;
        return this;
    }

    public GraphComputer persist(GraphComputer.Persist persist) {
        Preconditions.checkArgument((persist != null ? 1 : 0) != 0, (Object)"Need to specify mode");
        this.persistMode = persist;
        return this;
    }

    @Override
    public TitanGraphComputer workers(int threads) {
        Preconditions.checkArgument((threads > 0 ? 1 : 0) != 0, (String)"Invalid number of threads: %s", (Object[])new Object[]{threads});
        this.numThreads = threads;
        return this;
    }

    public TitanGraphComputer program(VertexProgram vertexProgram) {
        Preconditions.checkState((this.vertexProgram == null ? 1 : 0) != 0, (Object)"A vertex program has already been set");
        this.vertexProgram = vertexProgram;
        return this;
    }

    public TitanGraphComputer mapReduce(MapReduce mapReduce) {
        this.mapReduces.add(mapReduce);
        return this;
    }

    public Future<ComputerResult> submit() {
        if (this.executed) {
            throw GraphComputer.Exceptions.computerHasAlreadyBeenSubmittedAVertexProgram();
        }
        this.executed = true;
        if (null == this.vertexProgram && this.mapReduces.isEmpty()) {
            throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
        }
        if (null != this.vertexProgram) {
            GraphComputerHelper.validateProgramOnComputer((GraphComputer)this, this.vertexProgram);
            this.mapReduces.addAll(this.vertexProgram.getMapReducers());
        }
        this.persistMode = GraphComputerHelper.getPersistState(Optional.ofNullable(this.vertexProgram), Optional.ofNullable(this.persistMode));
        this.resultGraphMode = GraphComputerHelper.getResultGraphState(Optional.ofNullable(this.vertexProgram), Optional.ofNullable(this.resultGraphMode));
        if (!this.features().supportsResultGraphPersistCombination(this.resultGraphMode, this.persistMode)) {
            throw GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported((GraphComputer.ResultGraph)this.resultGraphMode, (GraphComputer.Persist)this.persistMode);
        }
        this.memory = new FulgoraMemory(this.vertexProgram, this.mapReduces);
        return CompletableFuture.supplyAsync(() -> {
            StandardScanner.Builder scanBuilder;
            Object job;
            long time = System.currentTimeMillis();
            if (null != this.vertexProgram) {
                this.vertexMemory = new FulgoraVertexMemory(this.expectedNumVertices, this.graph.getIDManager(), this.vertexProgram);
                this.vertexProgram.setup((Memory)this.memory);
                this.memory.completeSubRound();
                int iteration = 1;
                while (true) {
                    this.vertexMemory.nextIteration(this.vertexProgram.getMessageScopes((Memory)this.memory));
                    this.jobId = this.name + "#" + iteration;
                    job = VertexProgramScanJob.getVertexProgramScanJob(this.graph, this.memory, this.vertexMemory, this.vertexProgram);
                    scanBuilder = this.graph.getBackend().buildEdgeScanJob();
                    scanBuilder.setJobId(this.jobId);
                    scanBuilder.setNumProcessingThreads(this.numThreads);
                    scanBuilder.setWorkBlockSize(this.readBatchSize);
                    scanBuilder.setJob((ScanJob)job);
                    PartitionedVertexProgramExecutor pvpe = new PartitionedVertexProgramExecutor(this.graph, this.memory, this.vertexMemory, this.vertexProgram);
                    try {
                        ScanMetrics jobResult = (ScanMetrics)scanBuilder.execute().get();
                        long failures = jobResult.get(ScanMetrics.Metric.FAILURE);
                        if (failures > 0L) {
                            throw new TitanException("Failed to process [" + failures + "] vertices in vertex program iteration [" + iteration + "]. Computer is aborting.");
                        }
                        pvpe.run(this.numThreads, jobResult);
                        failures = jobResult.getCustom("partition-fail");
                        if (failures > 0L) {
                            throw new TitanException("Failed to process [" + failures + "] partitioned vertices in vertex program iteration [" + iteration + "]. Computer is aborting.");
                        }
                    }
                    catch (Exception e) {
                        throw new TitanException(e);
                    }
                    this.vertexMemory.completeIteration();
                    this.memory.completeSubRound();
                    try {
                        if (this.vertexProgram.terminate((Memory)this.memory)) {
                            break;
                        }
                    }
                    finally {
                        this.memory.incrIteration();
                        this.memory.completeSubRound();
                    }
                    ++iteration;
                }
            }
            HashMap<MapReduce, FulgoraMapEmitter> mapJobs = new HashMap<MapReduce, FulgoraMapEmitter>(this.mapReduces.size());
            for (MapReduce mapReduce : this.mapReduces) {
                if (!mapReduce.doStage(MapReduce.Stage.MAP)) continue;
                FulgoraMapEmitter mapEmitter = new FulgoraMapEmitter(mapReduce.doStage(MapReduce.Stage.REDUCE));
                mapJobs.put(mapReduce, mapEmitter);
            }
            this.jobId = this.name + "#map";
            job = VertexMapJob.getVertexMapJob(this.graph, this.vertexMemory, mapJobs);
            scanBuilder = this.graph.getBackend().buildEdgeScanJob();
            scanBuilder.setJobId(this.jobId);
            scanBuilder.setNumProcessingThreads(this.numThreads);
            scanBuilder.setWorkBlockSize(this.readBatchSize);
            scanBuilder.setJob((ScanJob)job);
            try {
                ScanMetrics jobResult = (ScanMetrics)scanBuilder.execute().get();
                long failures = jobResult.get(ScanMetrics.Metric.FAILURE);
                if (failures > 0L) {
                    throw new TitanException("Failed to process [" + failures + "] vertices in map phase. Computer is aborting.");
                }
                failures = jobResult.getCustom("map-fail");
                if (failures > 0L) {
                    throw new TitanException("Failed to process [" + failures + "] individual map jobs. Computer is aborting.");
                }
            }
            catch (Exception e) {
                throw new TitanException(e);
            }
            for (Map.Entry mapJob : mapJobs.entrySet()) {
                FulgoraMapEmitter mapEmitter = (FulgoraMapEmitter)mapJob.getValue();
                MapReduce mapReduce = (MapReduce)mapJob.getKey();
                mapEmitter.complete(mapReduce);
                if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
                    FulgoraReduceEmitter reduceEmitter = new FulgoraReduceEmitter();
                    try {
                        WorkerPool workers = new WorkerPool(this.numThreads);
                        Object object = null;
                        try {
                            workers.submit(() -> mapReduce.workerStart(MapReduce.Stage.REDUCE));
                            for (Map.Entry queueEntry : mapEmitter.reduceMap.entrySet()) {
                                workers.submit(() -> mapReduce.reduce(queueEntry.getKey(), ((Iterable)queueEntry.getValue()).iterator(), (MapReduce.ReduceEmitter)reduceEmitter));
                            }
                            workers.submit(() -> mapReduce.workerEnd(MapReduce.Stage.REDUCE));
                        }
                        catch (Throwable throwable) {
                            object = throwable;
                            throw throwable;
                        }
                        finally {
                            if (workers != null) {
                                if (object != null) {
                                    try {
                                        workers.close();
                                    }
                                    catch (Throwable throwable) {
                                        ((Throwable)object).addSuppressed(throwable);
                                    }
                                } else {
                                    workers.close();
                                }
                            }
                        }
                    }
                    catch (Exception e) {
                        throw new TitanException("Exception while executing reduce phase", e);
                    }
                    reduceEmitter.complete(mapReduce);
                    mapReduce.addResultToMemory((Memory.Admin)this.memory, reduceEmitter.reduceQueue.iterator());
                    continue;
                }
                mapReduce.addResultToMemory((Memory.Admin)this.memory, mapEmitter.mapQueue.iterator());
            }
            TitanGraphTransaction resultgraph = this.graph;
            if (this.persistMode == GraphComputer.Persist.NOTHING && this.resultGraphMode == GraphComputer.ResultGraph.NEW) {
                resultgraph = EmptyGraph.instance();
            } else if (this.persistMode != GraphComputer.Persist.NOTHING && this.vertexProgram != null && !this.vertexProgram.getElementComputeKeys().isEmpty()) {
                TitanManagement mgmt = this.graph.openManagement();
                try {
                    for (String key : this.vertexProgram.getElementComputeKeys()) {
                        if (!mgmt.containsPropertyKey(key)) {
                            log.warn("Property key [{}] is not part of the schema and will be created. It is advised to initialize all keys.", (Object)key);
                        }
                        mgmt.getOrCreatePropertyKey(key);
                    }
                    mgmt.commit();
                }
                finally {
                    if (mgmt != null && mgmt.isOpen()) {
                        mgmt.rollback();
                    }
                }
                Map mutatedProperties = Maps.transformValues(this.vertexMemory.getMutableVertexProperties(), (Function)new Function<Map<String, Object>, Map<String, Object>>(){

                    @Nullable
                    public Map<String, Object> apply(@Nullable Map<String, Object> o) {
                        return Maps.filterKeys(o, s -> !NON_PERSISTING_KEYS.contains(s));
                    }
                });
                if (this.resultGraphMode == GraphComputer.ResultGraph.ORIGINAL) {
                    AtomicInteger failures = new AtomicInteger(0);
                    try (WorkerPool workers = new WorkerPool(this.numThreads);){
                        ArrayList subset = new ArrayList(this.writeBatchSize / this.vertexProgram.getElementComputeKeys().size());
                        boolean bl = false;
                        for (Map.Entry entry : mutatedProperties.entrySet()) {
                            subset.add(entry);
                            if ((bl2 += ((Map)entry.getValue()).size()) < this.writeBatchSize) continue;
                            workers.submit(new VertexPropertyWriter(subset, failures));
                            subset = new ArrayList(subset.size());
                            boolean bl2 = false;
                        }
                        if (!subset.isEmpty()) {
                            workers.submit(new VertexPropertyWriter(subset, failures));
                        }
                    }
                    catch (Exception e) {
                        throw new TitanException("Exception while attempting to persist result into graph", e);
                    }
                    if (failures.get() > 0) {
                        throw new TitanException("Could not persist program results to graph. Check log for details.");
                    }
                } else if (this.resultGraphMode == GraphComputer.ResultGraph.NEW) {
                    resultgraph = this.graph.newTransaction();
                    for (Map.Entry vprop : mutatedProperties.entrySet()) {
                        Vertex v = (Vertex)resultgraph.vertices(new Object[]{vprop.getKey()}).next();
                        for (Map.Entry entry : ((Map)vprop.getValue()).entrySet()) {
                            v.property(VertexProperty.Cardinality.single, (String)entry.getKey(), entry.getValue(), new Object[0]);
                        }
                    }
                }
            }
            this.memory.setRuntime(System.currentTimeMillis() - time);
            this.memory.complete();
            return new DefaultComputerResult((Graph)resultgraph, (Memory)this.memory);
        });
    }

    public String toString() {
        return StringFactory.graphComputerString((GraphComputer)this);
    }

    public GraphComputer.Features features() {
        return new GraphComputer.Features(){

            public boolean supportsResultGraphPersistCombination(GraphComputer.ResultGraph resultGraph, GraphComputer.Persist persist) {
                return persist == GraphComputer.Persist.NOTHING || persist == GraphComputer.Persist.VERTEX_PROPERTIES;
            }

            public boolean supportsVertexAddition() {
                return false;
            }

            public boolean supportsVertexRemoval() {
                return false;
            }

            public boolean supportsVertexPropertyAddition() {
                return true;
            }

            public boolean supportsVertexPropertyRemoval() {
                return false;
            }

            public boolean supportsEdgeAddition() {
                return false;
            }

            public boolean supportsEdgeRemoval() {
                return false;
            }

            public boolean supportsEdgePropertyAddition() {
                return false;
            }

            public boolean supportsEdgePropertyRemoval() {
                return false;
            }
        };
    }

    private class VertexPropertyWriter
    implements Runnable {
        private final List<Map.Entry<Long, Map<String, Object>>> properties;
        private final AtomicInteger failures;

        private VertexPropertyWriter(List<Map.Entry<Long, Map<String, Object>>> properties, AtomicInteger failures) {
            assert (properties != null && !properties.isEmpty() && failures != null);
            this.properties = properties;
            this.failures = failures;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            TitanTransaction tx = FulgoraGraphComputer.this.graph.buildTransaction().enableBatchLoading().start();
            try {
                for (Map.Entry<Long, Map<String, Object>> vprop : this.properties) {
                    TitanVertex v = tx.getVertex(vprop.getKey());
                    for (Map.Entry<String, Object> prop : vprop.getValue().entrySet()) {
                        v.property(VertexProperty.Cardinality.single, prop.getKey(), prop.getValue(), new Object[0]);
                    }
                }
                tx.commit();
            }
            catch (Throwable e) {
                this.failures.incrementAndGet();
                log.error("Encountered exception while trying to write properties: ", e);
            }
            finally {
                if (tx != null && tx.isOpen()) {
                    tx.rollback();
                }
            }
        }
    }
}

