/*
 * Decompiled with CFR 0.152.
 */
package flipkart.cp.convert.chronosQ.impl.hbase;

import flipkart.cp.convert.chronosQ.core.DefaultSchedulerEntry;
import flipkart.cp.convert.chronosQ.core.SchedulerEntry;
import flipkart.cp.convert.chronosQ.core.SchedulerStore;
import flipkart.cp.convert.chronosQ.exceptions.ErrorCode;
import flipkart.cp.convert.chronosQ.exceptions.SchedulerException;
import flipkart.cp.convert.chronosQ.impl.hbase.HbaseUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HbaseSchedulerStore
implements SchedulerStore {
    static Logger log = LoggerFactory.getLogger((String)HbaseSchedulerStore.class.getSimpleName());
    private static final char FIRST_CHARACTER = '\u0000';
    private static final char LAST_CHARACTER = '\u00ff';
    private static final String START_STRING = Character.toString('\u0000');
    private static final String END_STRING = Character.toString('\u00ff');
    private static final String DELIMITER = "__";
    private final Connection hConnection;
    private final String tableName;
    private final byte[] columnFamily;
    private final byte[] column;
    private final String schedulerInstance;
    private Map<byte[], byte[]> dummyData = new HashMap<byte[], byte[]>();

    public HbaseSchedulerStore(Connection hConnection, String tableName, String columnFamily, String schedulerInstance) {
        this.hConnection = hConnection;
        this.tableName = tableName;
        this.schedulerInstance = schedulerInstance;
        this.columnFamily = Bytes.toBytes((String)columnFamily);
        this.column = Bytes.toBytes((String)"d");
        this.dummyData.put(this.column, Bytes.toBytes((boolean)true));
    }

    public void add(SchedulerEntry schedulerEntry, long time, int partitionNo) throws SchedulerException {
        String rowKey = this.getRowKey(schedulerEntry.getKey(), time, partitionNo);
        Table hTable = null;
        try {
            hTable = this.getHTable();
            if (!schedulerEntry.getPayload().equals(schedulerEntry.getKey())) {
                hTable.put(HbaseUtils.createPut(rowKey, this.columnFamily, Collections.singletonMap(this.column, schedulerEntry.getPayload().getBytes())));
            } else {
                hTable.put(HbaseUtils.createPut(rowKey, this.columnFamily, this.dummyData));
            }
        }
        catch (IOException e) {
            log.error("Exception occurred  for adding  -" + schedulerEntry + "Key" + time + "Partition " + partitionNo + "-" + e.fillInStackTrace());
            throw new SchedulerException((Throwable)e, ErrorCode.DATASTORE_READWRITE_ERROR);
        }
        finally {
            this.releaseHTableInterface(hTable);
        }
    }

    public Long update(SchedulerEntry schedulerEntry, long oldTime, long newTime, int partitionNo) throws SchedulerException {
        this.add(schedulerEntry, newTime, partitionNo);
        return this.remove(schedulerEntry.getKey(), oldTime, partitionNo);
    }

    public Long remove(String value, long time, int partitionNo) throws SchedulerException {
        String rowKey = this.getRowKey(value, time, partitionNo);
        Table hTable = null;
        try {
            hTable = this.getHTable();
            hTable.delete(new Delete(Bytes.toBytes((String)rowKey)));
            Long l = 1L;
            return l;
        }
        catch (IOException e) {
            log.error("Exception occurred while  for removing -" + value + "Key" + time + "Partition " + partitionNo + "-" + e.fillInStackTrace());
            throw new SchedulerException((Throwable)e, ErrorCode.DATASTORE_READWRITE_ERROR);
        }
        finally {
            this.releaseHTableInterface(hTable);
        }
    }

    public List<SchedulerEntry> get(long time, int partitionNum) throws SchedulerException {
        ArrayList<SchedulerEntry> entries = new ArrayList<SchedulerEntry>();
        String startRow = this.getRowKey(START_STRING, time, partitionNum);
        String stopRow = this.getRowKey(END_STRING, time, partitionNum);
        Scan scan = HbaseUtils.getScanner(startRow, stopRow, this.columnFamily);
        ResultScanner resultScanner = null;
        Table hTable = null;
        try {
            hTable = this.getHTable();
            resultScanner = hTable.getScanner(scan);
            Result result = resultScanner.next();
            while (null != result) {
                SchedulerEntry value = this.getValue(result);
                if (null != value) {
                    entries.add(value);
                }
                result = resultScanner.next();
            }
        }
        catch (IOException e) {
            log.error("Exception occurred While  for reading N Key" + time + "Partition " + partitionNum + "-" + e.fillInStackTrace());
            throw new SchedulerException((Throwable)e, ErrorCode.DATASTORE_READWRITE_ERROR);
        }
        finally {
            if (null != resultScanner) {
                resultScanner.close();
            }
            this.releaseHTableInterface(hTable);
        }
        return entries;
    }

    public List<SchedulerEntry> getNextN(long time, int partitionNum, int n) throws SchedulerException {
        ArrayList<SchedulerEntry> entries = new ArrayList<SchedulerEntry>();
        String startRow = this.getRowKey(START_STRING, time, partitionNum);
        String stopRow = this.getRowKey(END_STRING, time, partitionNum);
        Scan scan = HbaseUtils.getScanner(startRow, stopRow, this.columnFamily);
        Table hTable = null;
        ResultScanner resultScanner = null;
        try {
            Result[] results;
            hTable = this.getHTable();
            resultScanner = hTable.getScanner(scan);
            for (Result result : results = resultScanner.next(n)) {
                SchedulerEntry value = this.getValue(result);
                if (null == value) continue;
                entries.add(value);
            }
        }
        catch (IOException e) {
            log.error("Exception occurred While  for reading N Key" + time + "Partition " + partitionNum + "-" + e.fillInStackTrace());
            throw new SchedulerException((Throwable)e, ErrorCode.DATASTORE_READWRITE_ERROR);
        }
        finally {
            if (null != resultScanner) {
                resultScanner.close();
            }
            this.releaseHTableInterface(hTable);
        }
        return entries;
    }

    public void removeBulk(long time, int partitionNum, List<String> values) throws SchedulerException {
        ArrayList<Delete> deletes = new ArrayList<Delete>();
        for (String value : values) {
            deletes.add(new Delete(Bytes.toBytes((String)this.getRowKey(value, time, partitionNum))));
        }
        Table hTable = null;
        try {
            hTable = this.getHTable();
            hTable.delete(deletes);
        }
        catch (IOException e) {
            log.error("Exception occurred While  for acking  Key" + time + "Partition " + partitionNum + "-" + e.fillInStackTrace());
            throw new SchedulerException((Throwable)e, ErrorCode.DATASTORE_READWRITE_ERROR);
        }
        finally {
            this.releaseHTableInterface(hTable);
        }
    }

    private Table getHTable() throws IOException {
        return this.hConnection.getTable(TableName.valueOf((String)this.tableName));
    }

    private void releaseHTableInterface(Table hTable) throws SchedulerException {
        try {
            if (null != hTable) {
                hTable.close();
            }
        }
        catch (IOException e) {
            log.error("Exception occurred While closing hTable interface - " + this.tableName + "-" + e.fillInStackTrace());
            throw new SchedulerException((Throwable)e, ErrorCode.DATASTORE_READWRITE_ERROR);
        }
    }

    private String getRowKey(String key, long time, int partitionNo) {
        return this.schedulerInstance + DELIMITER + partitionNo + DELIMITER + Long.toString(time) + DELIMITER + key;
    }

    private SchedulerEntry getValue(Result result) throws SchedulerException {
        String rowKey = new String(result.getRow());
        String[] token = rowKey.split(DELIMITER, 4);
        if (token.length == 4) {
            byte[] payload = result.getValue(this.columnFamily, this.column);
            if (Arrays.equals(Bytes.toBytes((boolean)true), payload)) {
                return new DefaultSchedulerEntry(token[3]);
            }
            return new DefaultSchedulerEntry(token[3], new String(payload));
        }
        log.error("INVALID ENTRY : Exception occurred while reading row -" + rowKey);
        Table hTable = null;
        try {
            hTable = this.getHTable();
            hTable.delete(new Delete(result.getRow()));
        }
        catch (IOException e) {
            log.error("Error in deleting invalid entry " + rowKey + " -" + e.fillInStackTrace());
            throw new SchedulerException((Throwable)e, ErrorCode.DATASTORE_READWRITE_ERROR);
        }
        finally {
            this.releaseHTableInterface(hTable);
        }
        return null;
    }
}

