/*
 * Decompiled with CFR 0.152.
 */
package com.infochimps.elasticsearch;

import com.infochimps.elasticsearch.ElasticSearchOutputCommitter;
import com.infochimps.elasticsearch.hadoop.util.HadoopUtils;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.action.bulk.BulkRequestBuilder;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ElasticSearchOutputFormat
extends OutputFormat<NullWritable, MapWritable>
implements Configurable {
    static Log LOG = LogFactory.getLog(ElasticSearchOutputFormat.class);
    private Configuration conf = null;

    public RecordWriter<NullWritable, MapWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        return new ElasticSearchRecordWriter(context);
    }

    public void setConf(Configuration conf) {
    }

    public Configuration getConf() {
        return this.conf;
    }

    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
    }

    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
        return new ElasticSearchOutputCommitter();
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    protected class ElasticSearchRecordWriter
    extends RecordWriter<NullWritable, MapWritable> {
        private Node node;
        private Client client;
        private String indexName;
        private int bulkSize;
        private int idField;
        private String idFieldName;
        private String objType;
        private String[] fieldNames;
        private AtomicLong totalBulkTime = new AtomicLong();
        private AtomicLong totalBulkItems = new AtomicLong();
        private Random randgen = new Random();
        private long runStartTime = System.currentTimeMillis();
        private static final String ES_CONFIG_NAME = "elasticsearch.yml";
        private static final String ES_PLUGINS_NAME = "plugins";
        private static final String ES_INDEX_NAME = "elasticsearch.index.name";
        private static final String ES_BULK_SIZE = "elasticsearch.bulk.size";
        private static final String ES_ID_FIELD_NAME = "elasticsearch.id.field.name";
        private static final String ES_ID_FIELD = "elasticsearch.id.field";
        private static final String ES_OBJECT_TYPE = "elasticsearch.object.type";
        private static final String ES_CONFIG = "es.config";
        private static final String ES_PLUGINS = "es.path.plugins";
        private static final String COMMA = ",";
        private static final String SLASH = "/";
        private static final String NO_ID_FIELD = "-1";
        private volatile BulkRequestBuilder currentRequest;

        public ElasticSearchRecordWriter(TaskAttemptContext context) {
            Configuration conf = context.getConfiguration();
            this.indexName = conf.get(ES_INDEX_NAME);
            this.bulkSize = Integer.parseInt(conf.get(ES_BULK_SIZE));
            this.idFieldName = conf.get(ES_ID_FIELD_NAME);
            if (this.idFieldName.equals(NO_ID_FIELD)) {
                LOG.info((Object)"Documents will be assigned ids by elasticsearch");
                this.idField = -1;
            } else {
                LOG.info((Object)("Using field:[" + this.idFieldName + "] for document ids"));
            }
            this.objType = conf.get(ES_OBJECT_TYPE);
            try {
                String taskConfigPath = HadoopUtils.fetchFileFromCache(ES_CONFIG_NAME, conf);
                LOG.info((Object)("Using [" + taskConfigPath + "] as es.config"));
                String taskPluginsPath = HadoopUtils.fetchArchiveFromCache(ES_PLUGINS_NAME, conf);
                LOG.info((Object)("Using [" + taskPluginsPath + "] as es.plugins.dir"));
                System.setProperty(ES_CONFIG, taskConfigPath);
                System.setProperty(ES_PLUGINS, taskPluginsPath + SLASH + ES_PLUGINS_NAME);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            this.start_embedded_client();
            this.initialize_index(this.indexName);
            this.currentRequest = this.client.prepareBulk();
        }

        public void close(TaskAttemptContext context) throws IOException {
            if (this.currentRequest.numberOfActions() > 0) {
                try {
                    BulkResponse response = (BulkResponse)this.currentRequest.execute().actionGet();
                }
                catch (Exception e) {
                    LOG.warn((Object)("Bulk request failed: " + e.getMessage()));
                    throw new RuntimeException(e);
                }
            }
            LOG.info((Object)"Closing record writer");
            this.client.close();
            LOG.info((Object)"Client is closed");
            if (this.node != null) {
                this.node.close();
            }
            LOG.info((Object)"Record writer closed.");
        }

        public void write(NullWritable key, MapWritable fields) throws IOException {
            XContentBuilder builder = XContentFactory.jsonBuilder();
            this.buildContent(builder, (Writable)fields);
            if (this.idField == -1) {
                this.currentRequest.add(Requests.indexRequest((String)this.indexName).type(this.objType).source(builder));
            } else {
                try {
                    Text mapKey = new Text(this.idFieldName);
                    String record_id = fields.get((Object)mapKey).toString();
                    this.currentRequest.add(Requests.indexRequest((String)this.indexName).id(record_id).type(this.objType).create(false).source(builder));
                }
                catch (Exception e) {
                    LOG.warn((Object)"Encountered malformed record");
                }
            }
            this.processBulkIfNeeded();
        }

        private void buildContent(XContentBuilder builder, Writable value) throws IOException {
            if (value instanceof Text) {
                builder.value(((Text)value).toString());
            } else if (value instanceof LongWritable) {
                builder.value(((LongWritable)value).get());
            } else if (value instanceof IntWritable) {
                builder.value(((IntWritable)value).get());
            } else if (value instanceof DoubleWritable) {
                builder.value(((DoubleWritable)value).get());
            } else if (value instanceof FloatWritable) {
                builder.value(((FloatWritable)value).get());
            } else if (value instanceof BooleanWritable) {
                builder.value(((BooleanWritable)value).get());
            } else if (value instanceof MapWritable) {
                builder.startObject();
                for (Map.Entry entry : ((MapWritable)value).entrySet()) {
                    if (entry.getValue() instanceof NullWritable) continue;
                    builder.field(((Writable)entry.getKey()).toString());
                    this.buildContent(builder, (Writable)entry.getValue());
                }
                builder.endObject();
            } else if (value instanceof ArrayWritable) {
                builder.startArray();
                Writable[] arrayOfThings = ((ArrayWritable)value).get();
                for (int i = 0; i < arrayOfThings.length; ++i) {
                    this.buildContent(builder, arrayOfThings[i]);
                }
                builder.endArray();
            }
        }

        private void processBulkIfNeeded() {
            this.totalBulkItems.incrementAndGet();
            if (this.currentRequest.numberOfActions() >= this.bulkSize) {
                try {
                    long startTime = System.currentTimeMillis();
                    BulkResponse response = (BulkResponse)this.currentRequest.execute().actionGet();
                    this.totalBulkTime.addAndGet(System.currentTimeMillis() - startTime);
                    if (this.randgen.nextDouble() < 0.1) {
                        LOG.info((Object)("Indexed [" + this.totalBulkItems.get() + "] in [" + this.totalBulkTime.get() / 1000L + "s] of indexing" + "[" + (System.currentTimeMillis() - this.runStartTime) / 1000L + "s] of wall clock" + " for [" + (float)(1000.0 * (double)this.totalBulkItems.get()) / (float)(System.currentTimeMillis() - this.runStartTime) + "rec/s]"));
                    }
                }
                catch (Exception e) {
                    LOG.warn((Object)("Bulk request failed: " + e.getMessage()));
                    throw new RuntimeException(e);
                }
                this.currentRequest = this.client.prepareBulk();
            }
        }

        private void initialize_index(String indexName) {
            block2: {
                LOG.info((Object)"Initializing index");
                try {
                    this.client.admin().indices().prepareCreate(indexName).execute().actionGet();
                }
                catch (Exception e) {
                    if (!(ExceptionsHelper.unwrapCause((Throwable)e) instanceof IndexAlreadyExistsException)) break block2;
                    LOG.warn((Object)("Index [" + indexName + "] already exists"));
                }
            }
        }

        private void start_embedded_client() {
            LOG.info((Object)"Starting embedded elasticsearch client ...");
            this.node = NodeBuilder.nodeBuilder().client(true).node();
            this.client = this.node.client();
        }
    }
}

