/*
 * Decompiled with CFR 0.152.
 */
package com.infochimps.elasticsearch;

import com.infochimps.elasticsearch.ElasticSearchSplit;
import com.infochimps.elasticsearch.hadoop.util.HadoopUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.xcontent.QueryBuilders;
import org.elasticsearch.index.query.xcontent.XContentQueryBuilder;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.search.SearchHit;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class ElasticSearchInputFormat
extends InputFormat<Text, Text>
implements Configurable {
    static Log LOG = LogFactory.getLog(ElasticSearchInputFormat.class);
    private Configuration conf = null;
    private Node node;
    private Client client;
    private Integer requestSize;
    private Long numHits;
    private Long numSplits;
    private Long numSplitRecords;
    private String indexName;
    private String objType;
    private String queryString;
    private static final String ES_REQUEST_SIZE = "elasticsearch.request.size";
    private static final String ES_NUM_SPLITS = "elasticsearch.num.input.splits";
    private static final String ES_QUERY_STRING = "elasticsearch.query.string";
    private static final String ES_CONFIG_NAME = "elasticsearch.yml";
    private static final String ES_PLUGINS_NAME = "plugins";
    private static final String ES_INDEX_NAME = "elasticsearch.index.name";
    private static final String ES_OBJECT_TYPE = "elasticsearch.object.type";
    private static final String ES_CONFIG = "es.config";
    private static final String ES_PLUGINS = "es.path.plugins";
    private static final String SLASH = "/";

    public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) {
        return new ElasticSearchRecordReader();
    }

    public List<InputSplit> getSplits(JobContext context) {
        this.setConf(context.getConfiguration());
        ArrayList<InputSplit> splits = new ArrayList<InputSplit>(this.numSplits.intValue());
        int i = 0;
        while ((long)i < this.numSplits) {
            Long size = this.numSplitRecords == 1L ? 1L : this.numSplitRecords - 1L;
            splits.add(new ElasticSearchSplit(this.queryString, (long)i * this.numSplitRecords, size));
            ++i;
        }
        if (this.numHits % this.numSplits > 0L) {
            splits.add(new ElasticSearchSplit(this.queryString, this.numSplits * this.numSplitRecords, this.numHits % this.numSplits - 1L));
        }
        LOG.info((Object)("Created [" + splits.size() + "] splits for [" + this.numHits + "] hits"));
        return splits;
    }

    public void setConf(Configuration configuration) {
        this.conf = configuration;
        this.indexName = this.conf.get(ES_INDEX_NAME);
        this.objType = this.conf.get(ES_OBJECT_TYPE);
        this.requestSize = Integer.parseInt(this.conf.get(ES_REQUEST_SIZE));
        this.numSplits = Long.parseLong(this.conf.get(ES_NUM_SPLITS));
        this.queryString = this.conf.get(ES_QUERY_STRING);
        System.setProperty(ES_CONFIG, this.conf.get(ES_CONFIG));
        System.setProperty(ES_PLUGINS, this.conf.get(ES_PLUGINS));
        this.start_embedded_client();
        this.initiate_search();
    }

    public Configuration getConf() {
        return this.conf;
    }

    private void start_embedded_client() {
        LOG.info((Object)"Starting embedded elasticsearch client ...");
        this.node = NodeBuilder.nodeBuilder().client(true).node();
        this.client = this.node.client();
    }

    private void initiate_search() {
        SearchResponse response = (SearchResponse)this.client.prepareSearch(new String[]{this.indexName}).setTypes(new String[]{this.objType}).setSearchType(SearchType.COUNT).setQuery((XContentQueryBuilder)QueryBuilders.queryString((String)this.queryString)).setSize(this.requestSize.intValue()).execute().actionGet();
        this.numHits = response.hits().totalHits();
        if (this.numSplits > this.numHits) {
            this.numSplits = this.numHits;
        }
        this.numSplitRecords = this.numHits / this.numSplits;
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    protected class ElasticSearchRecordReader
    extends RecordReader<Text, Text> {
        private Node node;
        private Client client;
        private String indexName;
        private String objType;
        private Long numSplitRecords;
        private Integer requestSize;
        private Text currentKey;
        private Text currentValue;
        private Integer recordsRead;
        private Iterator<SearchHit> hitsItr = null;
        private String queryString;
        private Long from;
        private Long recsToRead;

        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
            Configuration conf = context.getConfiguration();
            this.indexName = conf.get(ElasticSearchInputFormat.ES_INDEX_NAME);
            this.objType = conf.get(ElasticSearchInputFormat.ES_OBJECT_TYPE);
            LOG.info((Object)("Initializing elasticsearch record reader on index [" + this.indexName + "] and object type [" + this.objType + "]"));
            try {
                String taskConfigPath = HadoopUtils.fetchFileFromCache(ElasticSearchInputFormat.ES_CONFIG_NAME, conf);
                LOG.info((Object)("Using [" + taskConfigPath + "] as es.config"));
                String taskPluginsPath = HadoopUtils.fetchArchiveFromCache(ElasticSearchInputFormat.ES_PLUGINS_NAME, conf);
                LOG.info((Object)("Using [" + taskPluginsPath + "] as es.plugins.dir"));
                System.setProperty(ElasticSearchInputFormat.ES_CONFIG, taskConfigPath);
                System.setProperty(ElasticSearchInputFormat.ES_PLUGINS, taskPluginsPath + ElasticSearchInputFormat.SLASH + ElasticSearchInputFormat.ES_PLUGINS_NAME);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            this.queryString = ((ElasticSearchSplit)split).getQueryString();
            this.from = ((ElasticSearchSplit)split).getFrom();
            this.recsToRead = ((ElasticSearchSplit)split).getSize();
            LOG.info((Object)("elasticsearch record reader: query [" + this.queryString + "], from [" + this.from + "], size [" + this.recsToRead + "]"));
            this.start_embedded_client();
            this.recordsRead = 0;
        }

        private void start_embedded_client() {
            LOG.info((Object)"Starting embedded elasticsearch client ...");
            this.node = NodeBuilder.nodeBuilder().client(true).node();
            this.client = this.node.client();
        }

        private Iterator<SearchHit> fetchNextHits() {
            SearchResponse response = (SearchResponse)this.client.prepareSearch(new String[]{this.indexName}).setTypes(new String[]{this.objType}).setFrom(this.from.intValue()).setSize(this.recsToRead.intValue()).setQuery((XContentQueryBuilder)QueryBuilders.queryString((String)this.queryString)).execute().actionGet();
            return response.hits().iterator();
        }

        public boolean nextKeyValue() throws IOException {
            if (this.hitsItr != null) {
                if ((long)this.recordsRead.intValue() < this.recsToRead) {
                    if (this.hitsItr.hasNext()) {
                        SearchHit hit = this.hitsItr.next();
                        this.currentKey = new Text(hit.id());
                        this.currentValue = new Text(hit.sourceAsString());
                        this.recordsRead = this.recordsRead + 1;
                        return true;
                    }
                } else {
                    this.hitsItr = null;
                }
            } else if ((long)this.recordsRead.intValue() < this.recsToRead) {
                this.hitsItr = this.fetchNextHits();
                if (this.hitsItr.hasNext()) {
                    SearchHit hit = this.hitsItr.next();
                    this.currentKey = new Text(hit.id());
                    this.currentValue = new Text(hit.sourceAsString());
                    this.recordsRead = this.recordsRead + 1;
                    return true;
                }
            }
            return false;
        }

        public Text getCurrentKey() {
            return this.currentKey;
        }

        public Text getCurrentValue() {
            return this.currentValue;
        }

        public float getProgress() throws IOException {
            return 0.0f;
        }

        public void close() throws IOException {
            LOG.info((Object)"Closing record reader");
            this.client.close();
            LOG.info((Object)"Client is closed");
            if (this.node != null) {
                this.node.close();
            }
            LOG.info((Object)"Record reader closed.");
        }
    }
}

