/*
 * Decompiled with CFR 0.152.
 */
package elephantdb.cascading;

import cascading.flow.Flow;
import cascading.flow.FlowListener;
import cascading.scheme.Scheme;
import cascading.tap.Tap;
import cascading.tap.TapException;
import cascading.tap.hadoop.TapCollector;
import cascading.tap.hadoop.TapIterator;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import elephantdb.DomainSpec;
import elephantdb.Utils;
import elephantdb.cascading.Common;
import elephantdb.cascading.Deserializer;
import elephantdb.hadoop.ElephantInputFormat;
import elephantdb.hadoop.ElephantOutputFormat;
import elephantdb.hadoop.ElephantRecordWritable;
import elephantdb.hadoop.ElephantUpdater;
import elephantdb.hadoop.LocalElephantManager;
import elephantdb.hadoop.ReplaceUpdater;
import elephantdb.store.DomainStore;
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;

public class ElephantDBTap
extends Tap
implements FlowListener {
    String _domainDir;
    DomainSpec _spec;
    Args _args;
    String _newVersionPath;
    private int _id;
    private static int globalid = 0;

    public ElephantDBTap(String string, Args args) throws IOException {
        this(string, null, args);
    }

    public ElephantDBTap(String string, DomainSpec domainSpec) throws IOException {
        this(string, domainSpec, new Args());
    }

    public ElephantDBTap(String string) throws IOException {
        this(string, null, new Args());
    }

    public ElephantDBTap(String string, DomainSpec domainSpec, Args args) throws IOException {
        super((Scheme)new ElephantScheme());
        this._domainDir = string;
        this._spec = new DomainStore(string, domainSpec).getSpec();
        this._args = args;
        this._id = globalid++;
    }

    private DomainStore getDomainStore() throws IOException {
        return new DomainStore(this._domainDir, this._spec);
    }

    public DomainSpec getSpec() {
        return this._spec;
    }

    public Fields getSinkFields() {
        return this._args.sinkFields;
    }

    public Fields getSourceFields() {
        return this._args.sourceFields;
    }

    public boolean isWriteDirect() {
        return true;
    }

    public Tuple source(Object object, Object object2) {
        object = this._args.deserializer == null ? object : this._args.deserializer.deserialize((BytesWritable)object);
        return new Tuple(new Object[]{object, object2});
    }

    public void sourceInit(JobConf jobConf) throws IOException {
        FileInputFormat.setInputPaths((JobConf)jobConf, (String)("/" + UUID.randomUUID().toString()));
        ElephantInputFormat.Args args = new ElephantInputFormat.Args(this._domainDir);
        args.inputDirHdfs = this._domainDir;
        if (this._args.persistenceOptions != null) {
            args.persistenceOptions = this._args.persistenceOptions;
        }
        if (this._args.tmpDirs != null) {
            LocalElephantManager.setTmpDirs((Configuration)jobConf, this._args.tmpDirs);
        }
        args.version = this._args.version;
        jobConf.setInt("mapred.task.timeout", this._args.timeoutMs);
        Utils.setObject((JobConf)jobConf, (String)"elephant.output.args", (Object)args);
        jobConf.setInputFormat(ElephantInputFormat.class);
    }

    public void sink(TupleEntry tupleEntry, OutputCollector outputCollector) throws IOException {
        int n = tupleEntry.getInteger((Comparable)Integer.valueOf(0));
        Comparable comparable = tupleEntry.get(1);
        byte[] byArray = Common.serializeElephantVal(comparable);
        byte[] byArray2 = Common.getBytes((BytesWritable)tupleEntry.get(2));
        ElephantRecordWritable elephantRecordWritable = new ElephantRecordWritable(byArray, byArray2);
        outputCollector.collect((Object)new IntWritable(n), (Object)elephantRecordWritable);
    }

    public void sinkInit(JobConf jobConf) throws IOException {
        DomainStore domainStore = this.getDomainStore();
        if (this._newVersionPath == null) {
            this._newVersionPath = domainStore.createVersion();
        }
        ElephantOutputFormat.Args args = new ElephantOutputFormat.Args(this._spec, this._newVersionPath);
        if (this._args.persistenceOptions != null) {
            args.persistenceOptions = this._args.persistenceOptions;
        }
        if (this._args.tmpDirs != null) {
            LocalElephantManager.setTmpDirs((Configuration)jobConf, this._args.tmpDirs);
        }
        if (this._args.updater != null) {
            args.updater = this._args.updater;
            args.updateDirHdfs = domainStore.mostRecentVersionPath();
        }
        Utils.setObject((JobConf)jobConf, (String)"elephant.output.args", (Object)args);
        jobConf.setInt("mapred.task.timeout", this._args.timeoutMs);
        jobConf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
        jobConf.setInt("mapred.reduce.tasks", this._spec.getNumShards());
        jobConf.setOutputFormat(ElephantOutputFormat.class);
    }

    public Path getPath() {
        return new Path(this._domainDir);
    }

    public boolean makeDirs(JobConf jobConf) throws IOException {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public boolean deletePath(JobConf jobConf) throws IOException {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public boolean pathExists(JobConf jobConf) throws IOException {
        return false;
    }

    public long getPathModified(JobConf jobConf) throws IOException {
        return System.currentTimeMillis();
    }

    public void onStarting(Flow flow) {
    }

    public void onStopping(Flow flow) {
    }

    private boolean isSinkOf(Flow flow) {
        for (Map.Entry entry : flow.getSinks().entrySet()) {
            if (entry.getValue() != this) continue;
            return true;
        }
        return false;
    }

    public void onCompleted(Flow flow) {
        try {
            if (this.isSinkOf(flow)) {
                DomainStore domainStore = this.getDomainStore();
                if (flow.getFlowStats().isSuccessful()) {
                    domainStore.getFileSystem().mkdirs(new Path(this._newVersionPath));
                    if (this._args.updater != null) {
                        domainStore.synchronizeInProgressVersion(this._newVersionPath);
                    }
                    domainStore.succeedVersion(this._newVersionPath);
                } else {
                    domainStore.failVersion(this._newVersionPath);
                }
            }
        }
        catch (IOException iOException) {
            throw new TapException("Couldn't finalize new elephant domain version", (Throwable)iOException);
        }
        finally {
            this._newVersionPath = null;
        }
    }

    public boolean onThrowable(Flow flow, Throwable throwable) {
        return false;
    }

    public TupleEntryCollector openForWrite(JobConf jobConf) throws IOException {
        return new TapCollector((Tap)this, jobConf);
    }

    public TupleEntryIterator openForRead(JobConf jobConf) throws IOException {
        return new TupleEntryIterator(this.getSourceFields(), new Iterator[]{new TapIterator((Tap)this, jobConf)});
    }

    public boolean equals(Object object) {
        if (object instanceof ElephantDBTap) {
            return this._id == ((ElephantDBTap)((Object)object))._id;
        }
        return false;
    }

    public int hashCode() {
        return new Integer(this._id).hashCode();
    }

    public static class Args
    implements Serializable {
        public Map<String, Object> persistenceOptions = null;
        public List<String> tmpDirs = null;
        public int timeoutMs = 0x6DDD00;
        public Fields sourceFields = new Fields(new Comparable[]{"key", "value"});
        public Long version = null;
        public Deserializer deserializer = null;
        public Fields sinkFields = Fields.ALL;
        public ElephantUpdater updater = new ReplaceUpdater();
    }

    public static class ElephantScheme
    extends Scheme {
        public void sourceInit(Tap tap, JobConf jobConf) throws IOException {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public void sinkInit(Tap tap, JobConf jobConf) throws IOException {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public Tuple source(Object object, Object object2) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public void sink(TupleEntry tupleEntry, OutputCollector outputCollector) throws IOException {
            throw new UnsupportedOperationException("Not supported yet.");
        }
    }
}

