/*
 * Decompiled with CFR 0.152.
 */
package com.infochimps.elasticsearch.pig;

import com.infochimps.elasticsearch.ElasticSearchOutputFormat;
import com.infochimps.elasticsearch.hadoop.util.HadoopUtils;
import java.io.IOException;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;

public class ElasticSearchIndex
extends StoreFunc
implements StoreFuncInterface {
    private static final Log LOG = LogFactory.getLog(ElasticSearchIndex.class);
    protected RecordWriter writer = null;
    protected String idField;
    protected String bulkSize;
    protected String esConfig;
    protected String esPlugins;
    private static final String ES_INDEX_NAME = "elasticsearch.index.name";
    private static final String ES_BULK_SIZE = "elasticsearch.bulk.size";
    private static final String ES_IS_JSON = "elasticsearch.is_json";
    private static final String ES_ID_FIELD_NAME = "elasticsearch.id.field.name";
    private static final String ES_FIELD_NAMES = "elasticsearch.field.names";
    private static final String ES_ID_FIELD = "elasticsearch.id.field";
    private static final String ES_OBJECT_TYPE = "elasticsearch.object.type";
    private static final String PIG_ES_FIELD_NAMES = "elasticsearch.pig.field.names";
    private static final String SLASH = "/";
    private static final String COMMA = ",";
    private static final String LOCAL_SCHEME = "file://";
    private static final String NO_ID_FIELD = "-1";
    private static final String DEFAULT_BULK = "1000";
    private static final String DEFAULT_ES_CONFIG = "/etc/elasticsearch/elasticsearch.yml";
    private static final String DEFAULT_ES_PLUGINS = "/usr/local/share/elasticsearch/plugins";
    private static final String ES_CONFIG_HDFS_PATH = "/tmp/elasticsearch/elasticsearch.yml";
    private static final String ES_PLUGINS_HDFS_PATH = "/tmp/elasticsearch/plugins";

    public ElasticSearchIndex() {
        this(NO_ID_FIELD, DEFAULT_BULK);
    }

    public ElasticSearchIndex(String idField, String bulkSize) {
        this(idField, bulkSize, DEFAULT_ES_CONFIG);
    }

    public ElasticSearchIndex(String idField, String bulkSize, String esConfig) {
        this(idField, bulkSize, esConfig, DEFAULT_ES_PLUGINS);
    }

    public ElasticSearchIndex(String idField, String bulkSize, String esConfig, String esPlugins) {
        this.idField = idField;
        this.bulkSize = bulkSize;
        this.esConfig = esConfig;
        this.esPlugins = esPlugins;
    }

    public void checkSchema(ResourceSchema s) throws IOException {
        UDFContext context = UDFContext.getUDFContext();
        Properties property = context.getUDFProperties(ResourceSchema.class);
        String fieldNames = "";
        for (String field : s.fieldNames()) {
            fieldNames = fieldNames + field;
            fieldNames = fieldNames + COMMA;
        }
        property.setProperty(PIG_ES_FIELD_NAMES, fieldNames);
    }

    public void setStoreLocation(String location, Job job) throws IOException {
        String[] es_store = location.substring(5).split(SLASH);
        if (es_store.length != 2) {
            throw new RuntimeException("Please specify a valid elasticsearch index, eg. es://myindex/myobj");
        }
        Configuration conf = job.getConfiguration();
        if (conf.get(ES_INDEX_NAME) == null) {
            try {
                job.getConfiguration().set(ES_INDEX_NAME, es_store[0]);
                job.getConfiguration().set(ES_OBJECT_TYPE, es_store[1]);
            }
            catch (ArrayIndexOutOfBoundsException e) {
                throw new RuntimeException("You must specify both an index and an object type.");
            }
            job.getConfiguration().setBoolean(ES_IS_JSON, false);
            job.getConfiguration().set(ES_BULK_SIZE, this.bulkSize);
            job.getConfiguration().set(ES_ID_FIELD, this.idField);
            try {
                Path hdfsConfigPath = new Path(ES_CONFIG_HDFS_PATH);
                Path hdfsPluginsPath = new Path(ES_PLUGINS_HDFS_PATH);
                HadoopUtils.uploadLocalFile(new Path(LOCAL_SCHEME + this.esConfig), hdfsConfigPath, job.getConfiguration());
                HadoopUtils.shipFileIfNotShipped(hdfsConfigPath, job.getConfiguration());
                HadoopUtils.uploadLocalFile(new Path(LOCAL_SCHEME + this.esPlugins), hdfsPluginsPath, job.getConfiguration());
                HadoopUtils.shipArchiveIfNotShipped(hdfsPluginsPath, job.getConfiguration());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            UDFContext context = UDFContext.getUDFContext();
            Properties property = context.getUDFProperties(ResourceSchema.class);
            job.getConfiguration().set(ES_FIELD_NAMES, property.getProperty(PIG_ES_FIELD_NAMES));
        }
    }

    public OutputFormat getOutputFormat() throws IOException {
        return new ElasticSearchOutputFormat();
    }

    public void prepareToWrite(RecordWriter writer) throws IOException {
        this.writer = writer;
    }

    public void putNext(Tuple t) throws IOException {
        UDFContext context = UDFContext.getUDFContext();
        Properties property = context.getUDFProperties(ResourceSchema.class);
        MapWritable record = new MapWritable();
        String[] fieldNames = property.getProperty(PIG_ES_FIELD_NAMES).split(COMMA);
        for (int i = 0; i < t.size(); ++i) {
            if (i >= fieldNames.length) continue;
            try {
                record.put((Writable)new Text(fieldNames[i]), (Writable)new Text(t.get(i).toString()));
                continue;
            }
            catch (NullPointerException e) {
                // empty catch block
            }
        }
        try {
            this.writer.write((Object)NullWritable.get(), (Object)record);
        }
        catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    public void cleanupOnFailure(String location, Job job) throws IOException {
    }
}

