/*
 * Decompiled with CFR 0.152.
 */
package store;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.DefaultRetryPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.config.DynamicPropertyFactory;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import store.KVStore;
import store.Metrics;

public class CassandraStore
implements KVStore<List<byte[]>> {
    private static final Logger log = LoggerFactory.getLogger(CassandraStore.class);
    final String keyspace;
    public static final String UID_COLUMN = "uid";
    public static final String TIME_COLUMN = "time";
    public static final String DATA_COLUMN = "data";
    private Session session;
    private Cluster cluster;
    private DynamicPropertyFactory propertyFactory = DynamicPropertyFactory.getInstance();
    static final Map<Long, String> SHARD_TABLE = Maps.newHashMap();
    private static final ThreadPoolExecutor executor;
    private int defaultTTL = 3600;
    private static final Timer cassandraWriteDurationTimer;
    private static final Timer cassandraReadDurationTimer;
    private static final Meter writeMeter;
    private static final Meter readMeter;
    private static final Meter readmeMeter;
    private static final Meter readmeErrorMeter;

    public CassandraStore(String keyspace) {
        this.keyspace = keyspace;
        log.info("cassandra keyspace:{}", (Object)this.keyspace);
    }

    public CassandraStore(String keyspace, int ttlInSeconds) {
        this(keyspace);
        this.defaultTTL = ttlInSeconds;
    }

    private static ThreadPoolExecutor newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
        return new ThreadPoolExecutor(nThreads, nThreads, 60000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadFactoryBuilder().setNameFormat("cassandra-cbk-thread-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    @Override
    public void connect() {
        String[] cassandra_hosts = this.propertyFactory.getStringProperty("contactPoints", null).get().split(",");
        HashSet<InetSocketAddress> contactPoints = new HashSet<InetSocketAddress>();
        for (String host : cassandra_hosts) {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(host.split(":")[0], Integer.parseInt(host.split(":")[1]));
            contactPoints.add(inetSocketAddress);
        }
        this.cluster = Cluster.builder().withLoadBalancingPolicy((LoadBalancingPolicy)new TokenAwarePolicy((LoadBalancingPolicy)DCAwareRoundRobinPolicy.builder().build())).withPoolingOptions(this.getPoolingOptions()).addContactPointsWithPorts(contactPoints).withRetryPolicy((RetryPolicy)DefaultRetryPolicy.INSTANCE).build();
        Metadata metadata = this.cluster.getMetadata();
        System.out.printf("Connected to cluster: %s%n", metadata.getClusterName());
        for (Host host : metadata.getAllHosts()) {
            System.out.printf("Datacenter: %s; Host: %s; Rack: %s%n", host.getDatacenter(), host.getAddress(), host.getRack());
        }
        try {
            this.session = this.cluster.connect(this.keyspace);
        }
        catch (NoHostAvailableException e2) {
            e2.printStackTrace();
        }
        Metrics.getRegistry().register(MetricRegistry.name(CassandraStore.class, (String[])new String[]{"ckb.queue.size"}), (Metric)((Gauge)() -> executor.getQueue().size()));
    }

    @Override
    public void shutdown() {
        try {
            executor.shutdown();
            executor.awaitTermination(60L, TimeUnit.SECONDS);
            if (this.session != null) {
                this.session.close();
            }
            if (this.cluster != null) {
                this.cluster.close();
            }
        }
        catch (InterruptedException e2) {
            e2.printStackTrace();
        }
    }

    @Override
    public List<byte[]> get(long shard, long uid, long startDay, long endDay) {
        readMeter.mark();
        Select select = this.buildSelect(shard, uid, startDay, endDay);
        try {
            Timer.Context timer = cassandraReadDurationTimer.time();
            ResultSetFuture future = this.session.executeAsync((Statement)select);
            ResultSet resultSet = (ResultSet)future.get(15L, TimeUnit.SECONDS);
            timer.stop();
            return resultSet.all().stream().map(row -> row.getBytes(DATA_COLUMN).array()).collect(Collectors.toList());
        }
        catch (Exception e2) {
            e2.printStackTrace();
            return null;
        }
    }

    private Select buildSelect(long shard, Long uid, Long startDay, Long endDay) {
        Select select = QueryBuilder.select().column(DATA_COLUMN).from(SHARD_TABLE.get(shard));
        select.where(QueryBuilder.eq((String)UID_COLUMN, (Object)uid)).and(QueryBuilder.gte((String)TIME_COLUMN, (Object)startDay)).and(QueryBuilder.lte((String)TIME_COLUMN, (Object)endDay));
        return select;
    }

    @Override
    public void asyncGet(long shard, long uid, long startDay, long endDay, final Consumer<List<byte[]>> onMessage) {
        readMeter.mark();
        Select select = this.buildSelect(shard, uid, startDay, endDay);
        ResultSetFuture listenableFuture = this.session.executeAsync((Statement)select);
        Futures.addCallback((ListenableFuture)listenableFuture, (FutureCallback)new FutureCallback<ResultSet>(){

            public void onSuccess(ResultSet rows) {
                readmeMeter.mark();
                List days = rows.all().stream().map(row -> row.getBytes(CassandraStore.DATA_COLUMN).array()).collect(Collectors.toList());
                onMessage.accept(days);
            }

            public void onFailure(Throwable throwable) {
                readmeErrorMeter.mark();
                onMessage.accept(null);
                boolean flag = CassandraStore.this.propertyFactory.getBooleanProperty("printError", false).get();
                if (flag) {
                    throwable.printStackTrace();
                }
            }
        }, (Executor)executor);
    }

    @Override
    public void asyncSave(long shard, long uid, long day, byte[] data) {
        writeMeter.mark();
        Insert insert = this.buildInsert(shard, uid, day, data);
        this.session.executeAsync((Statement)insert);
    }

    private Insert buildInsert(long shard, long uid, long day, byte[] data) {
        Insert insert = QueryBuilder.insertInto((String)SHARD_TABLE.get(shard)).value(UID_COLUMN, (Object)uid).value(TIME_COLUMN, (Object)day).value(DATA_COLUMN, (Object)ByteBuffer.wrap(data));
        insert.using(QueryBuilder.ttl((int)this.defaultTTL));
        return insert;
    }

    @Override
    public boolean save(long shard, long uid, long day, byte[] data) {
        writeMeter.mark();
        Insert insert = this.buildInsert(shard, uid, day, data);
        try {
            Timer.Context timer = cassandraWriteDurationTimer.time();
            this.session.execute((Statement)insert);
            timer.stop();
            return true;
        }
        catch (Exception e2) {
            e2.printStackTrace();
            return false;
        }
    }

    private PoolingOptions getPoolingOptions() {
        int connectionsPerHost = this.propertyFactory.getIntProperty("connectionsPerHost", 10).get();
        PoolingOptions poolingOptions = new PoolingOptions();
        poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, connectionsPerHost).setMaxConnectionsPerHost(HostDistance.LOCAL, connectionsPerHost).setCoreConnectionsPerHost(HostDistance.REMOTE, 2).setMaxConnectionsPerHost(HostDistance.REMOTE, 4).setHeartbeatIntervalSeconds(60);
        return poolingOptions;
    }

    static {
        LongStream.range(0L, 60L).forEach(shd -> SHARD_TABLE.put(shd, "S" + shd));
        executor = CassandraStore.newFixedThreadPoolWithQueueSize(Runtime.getRuntime().availableProcessors(), 200000);
        cassandraWriteDurationTimer = Metrics.timer(CassandraStore.class, "time.write");
        cassandraReadDurationTimer = Metrics.timer(CassandraStore.class, "time.read");
        writeMeter = Metrics.meter(CassandraStore.class, "meter.write");
        readMeter = Metrics.meter(CassandraStore.class, "meter.read");
        readmeMeter = Metrics.meter(CassandraStore.class, "meter.readme.success");
        readmeErrorMeter = Metrics.meter(CassandraStore.class, "meter.readme.error");
    }
}

