/*
 * Decompiled with CFR 0.152.
 */
package com.infochimps.elasticsearch.pig;

import com.infochimps.elasticsearch.ElasticSearchInputFormat;
import com.infochimps.elasticsearch.ElasticSearchOutputFormat;
import com.infochimps.elasticsearch.hadoop.util.HadoopUtils;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.LoadFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.UDFContext;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ElasticSearchStorage
extends LoadFunc
implements StoreFuncInterface {
    private String contextSignature = null;
    private RecordReader reader;
    protected RecordWriter writer = null;
    protected ObjectMapper mapper = new ObjectMapper();
    protected String esConfig;
    protected String esPlugins;
    private static final String ES_INDEX_NAME = "elasticsearch.index.name";
    private static final String ES_BULK_SIZE = "elasticsearch.bulk.size";
    private static final String ES_ID_FIELD_NAME = "elasticsearch.id.field.name";
    private static final String ES_OBJECT_TYPE = "elasticsearch.object.type";
    private static final String ES_IS_JSON = "elasticsearch.is_json";
    private static final String PIG_ES_FIELD_NAMES = "elasticsearch.pig.field.names";
    private static final String ES_REQUEST_SIZE = "elasticsearch.request.size";
    private static final String ES_NUM_SPLITS = "elasticsearch.num.input.splits";
    private static final String ES_QUERY_STRING = "elasticsearch.query.string";
    private static final String COMMA = ",";
    private static final String LOCAL_SCHEME = "file://";
    private static final String DEFAULT_BULK = "1000";
    private static final String DEFAULT_ES_CONFIG = "/etc/elasticsearch/elasticsearch.yml";
    private static final String DEFAULT_ES_PLUGINS = "/usr/local/share/elasticsearch/plugins";
    private static final String ES_CONFIG_HDFS_PATH = "/tmp/elasticsearch/elasticsearch.yml";
    private static final String ES_PLUGINS_HDFS_PATH = "/tmp/elasticsearch/plugins";
    private static final String ES_CONFIG = "es.config";
    private static final String ES_PLUGINS = "es.path.plugins";

    public ElasticSearchStorage() {
        this(DEFAULT_ES_CONFIG, DEFAULT_ES_PLUGINS);
    }

    public ElasticSearchStorage(String esConfig) {
        this(esConfig, DEFAULT_ES_PLUGINS);
    }

    public ElasticSearchStorage(String esConfig, String esPlugins) {
        this.esConfig = esConfig;
        this.esPlugins = esPlugins;
    }

    public Tuple getNext() throws IOException {
        try {
            Tuple tuple = TupleFactory.getInstance().newTuple(2);
            if (this.reader.nextKeyValue()) {
                Text docId = (Text)this.reader.getCurrentKey();
                Text docContent = (Text)this.reader.getCurrentValue();
                tuple.set(0, (Object)new DataByteArray(docId.toString()));
                tuple.set(1, (Object)new DataByteArray(docContent.toString()));
                return tuple;
            }
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
        return null;
    }

    public InputFormat getInputFormat() {
        return new ElasticSearchInputFormat();
    }

    public void prepareToRead(RecordReader reader, PigSplit split) {
        this.reader = reader;
    }

    public void setUDFContextSignature(String signature) {
        this.contextSignature = signature;
    }

    public void setLocation(String location, Job job) throws IOException {
        this.elasticSearchSetup(location, job);
    }

    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
        return location;
    }

    public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
        return location;
    }

    public OutputFormat getOutputFormat() throws IOException {
        return new ElasticSearchOutputFormat();
    }

    public void checkSchema(ResourceSchema s) throws IOException {
        UDFContext context = UDFContext.getUDFContext();
        Properties property = context.getUDFProperties(ResourceSchema.class);
        String fieldNames = "";
        for (String field : s.fieldNames()) {
            fieldNames = fieldNames + field;
            fieldNames = fieldNames + COMMA;
        }
        property.setProperty(PIG_ES_FIELD_NAMES, fieldNames);
    }

    public void prepareToWrite(RecordWriter writer) throws IOException {
        this.writer = writer;
    }

    public void putNext(Tuple t) throws IOException {
        UDFContext context = UDFContext.getUDFContext();
        Properties property = context.getUDFProperties(ResourceSchema.class);
        MapWritable record = new MapWritable();
        String isJson = property.getProperty(ES_IS_JSON);
        if (isJson != null && isJson.equals("false")) {
            String[] fieldNames = property.getProperty(PIG_ES_FIELD_NAMES).split(COMMA);
            for (int i = 0; i < t.size(); ++i) {
                if (i >= fieldNames.length) continue;
                try {
                    record.put((Writable)new Text(fieldNames[i]), (Writable)new Text(t.get(i).toString()));
                    continue;
                }
                catch (NullPointerException e) {
                    // empty catch block
                }
            }
        } else if (!t.isNull(0)) {
            String jsonData = t.get(0).toString();
            try {
                HashMap data = (HashMap)this.mapper.readValue(jsonData, HashMap.class);
                record = (MapWritable)this.toWritable(data);
            }
            catch (JsonParseException e) {
                e.printStackTrace();
            }
            catch (JsonMappingException e) {
                e.printStackTrace();
            }
        }
        try {
            this.writer.write((Object)NullWritable.get(), (Object)record);
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    public void setStoreFuncUDFContextSignature(String signature) {
        this.contextSignature = signature;
    }

    private void elasticSearchSetup(String location, Job job) {
        block11: {
            try {
                URI parsedLocation = new URI(location);
                HashMap<String, String> query = this.parseURIQuery(parsedLocation.getQuery());
                String esHost = location.substring(5).split("/")[0];
                if (esHost == null) {
                    throw new RuntimeException("Missing elasticsearch index name, URI must be formatted as es://<index_name>/<object_type>?<params>");
                }
                if (parsedLocation.getPath() == null) {
                    throw new RuntimeException("Missing elasticsearch object type, URI must be formatted as es://<index_name>/<object_type>?<params>");
                }
                Configuration conf = job.getConfiguration();
                if (conf.get(ES_INDEX_NAME) != null) break block11;
                job.getConfiguration().set(ES_INDEX_NAME, esHost);
                job.getConfiguration().set(ES_OBJECT_TYPE, parsedLocation.getPath().replaceAll("/", ""));
                String requestSize = query.get("size");
                if (requestSize == null) {
                    requestSize = DEFAULT_BULK;
                }
                job.getConfiguration().set(ES_BULK_SIZE, requestSize);
                job.getConfiguration().set(ES_REQUEST_SIZE, requestSize);
                String idFieldName = query.get("id");
                if (idFieldName == null) {
                    idFieldName = "-1";
                }
                job.getConfiguration().set(ES_ID_FIELD_NAME, idFieldName);
                String queryString = query.get("q");
                if (queryString == null) {
                    queryString = "*";
                }
                job.getConfiguration().set(ES_QUERY_STRING, queryString);
                String numTasks = query.get("tasks");
                if (numTasks == null) {
                    numTasks = "100";
                }
                job.getConfiguration().set(ES_NUM_SPLITS, numTasks);
                try {
                    Path hdfsConfigPath = new Path(ES_CONFIG_HDFS_PATH);
                    Path hdfsPluginsPath = new Path(ES_PLUGINS_HDFS_PATH);
                    HadoopUtils.uploadLocalFileIfChanged(new Path(LOCAL_SCHEME + this.esConfig), hdfsConfigPath, job.getConfiguration());
                    HadoopUtils.shipFileIfNotShipped(hdfsConfigPath, job.getConfiguration());
                    HadoopUtils.uploadLocalFileIfChanged(new Path(LOCAL_SCHEME + this.esPlugins), hdfsPluginsPath, job.getConfiguration());
                    HadoopUtils.shipArchiveIfNotShipped(hdfsPluginsPath, job.getConfiguration());
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                String isJson = query.get("json");
                if (isJson == null || isJson.equals("false")) {
                    UDFContext context = UDFContext.getUDFContext();
                    Properties property = context.getUDFProperties(ResourceSchema.class);
                    property.setProperty(ES_IS_JSON, "false");
                }
                job.getConfiguration().set(ES_CONFIG, this.esConfig);
                job.getConfiguration().set(ES_PLUGINS, this.esPlugins);
            }
            catch (URISyntaxException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void setStoreLocation(String location, Job job) throws IOException {
        this.elasticSearchSetup(location, job);
    }

    private HashMap<String, String> parseURIQuery(String query) {
        HashMap<String, String> argMap = new HashMap<String, String>();
        if (query != null) {
            String[] pairs;
            for (String pair : pairs = query.split("&")) {
                String[] splitPair = pair.split("=");
                argMap.put(splitPair[0], splitPair[1]);
            }
        }
        return argMap;
    }

    private Writable toWritable(Object thing) {
        if (thing instanceof String) {
            return new Text((String)thing);
        }
        if (thing instanceof Long) {
            return new LongWritable(((Long)thing).longValue());
        }
        if (thing instanceof Integer) {
            return new IntWritable(((Integer)thing).intValue());
        }
        if (thing instanceof Double) {
            return new DoubleWritable(((Double)thing).doubleValue());
        }
        if (thing instanceof Float) {
            return new FloatWritable(((Float)thing).floatValue());
        }
        if (thing instanceof Boolean) {
            return new BooleanWritable(((Boolean)thing).booleanValue());
        }
        if (thing instanceof Map) {
            MapWritable result = new MapWritable();
            for (Map.Entry entry : ((Map)thing).entrySet()) {
                result.put((Writable)new Text(((String)entry.getKey()).toString()), this.toWritable(entry.getValue()));
            }
            return result;
        }
        if (thing instanceof List && ((List)thing).size() > 0) {
            Object first = ((List)thing).get(0);
            Writable[] listOfThings = new Writable[((List)thing).size()];
            for (int i = 0; i < listOfThings.length; ++i) {
                listOfThings[i] = this.toWritable(((List)thing).get(i));
            }
            return new ArrayWritable(this.toWritable(first).getClass(), listOfThings);
        }
        return NullWritable.get();
    }

    public void cleanupOnFailure(String location, Job job) throws IOException {
    }
}

