/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.priority.kafka.client.consumer.burst;

import com.flipkart.priority.kafka.client.ClientUtils;
import com.flipkart.priority.kafka.client.consumer.AbstractPriorityKafkaConsumer;
import com.flipkart.priority.kafka.client.consumer.burst.ExpMaxPollRecordsDistributor;
import com.flipkart.priority.kafka.client.consumer.burst.MaxPollRecordsDistributor;
import com.flipkart.priority.kafka.client.consumer.burst.Window;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CapacityBurstPriorityKafkaConsumer<K, V>
extends AbstractPriorityKafkaConsumer<K, V> {
    private static final Logger log = LoggerFactory.getLogger(CapacityBurstPriorityKafkaConsumer.class);
    private static final String FETCHER_FIELD = "fetcher";
    private static final String MAX_POLL_RECORDS_FIELD = "maxPollRecords";
    private int maxPriority;
    private Map<Integer, Integer> maxPollRecordDistribution;
    private Map<Integer, KafkaConsumer<K, V>> consumers;
    private Map<Integer, Window> consumerPollWindowHistory;

    public CapacityBurstPriorityKafkaConsumer(Map<String, Object> configs) {
        this((MaxPollRecordsDistributor)ExpMaxPollRecordsDistributor.instance(), configs, null, null);
    }

    public CapacityBurstPriorityKafkaConsumer(MaxPollRecordsDistributor maxPollRecordsDistributor, Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        Properties properties = new Properties();
        for (Map.Entry<String, Object> entry : configs.entrySet()) {
            properties.setProperty(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
        }
        this.initialize(maxPollRecordsDistributor, properties, keyDeserializer, valueDeserializer);
    }

    public CapacityBurstPriorityKafkaConsumer(Properties properties) {
        this((MaxPollRecordsDistributor)ExpMaxPollRecordsDistributor.instance(), properties, (Deserializer<K>)null, (Deserializer<V>)null);
    }

    public CapacityBurstPriorityKafkaConsumer(MaxPollRecordsDistributor maxPollRecordsDistributor, Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this.initialize(maxPollRecordsDistributor, properties, keyDeserializer, valueDeserializer);
    }

    CapacityBurstPriorityKafkaConsumer(int maxPriority, int maxPollRecords, Map<Integer, KafkaConsumer<K, V>> consumers, Map<Integer, Window> consumerPollWindowHistory) {
        this.maxPriority = maxPriority;
        this.maxPollRecordDistribution = ExpMaxPollRecordsDistributor.instance().distribution(maxPriority, maxPollRecords);
        this.consumerPollWindowHistory = consumerPollWindowHistory;
        this.consumers = consumers;
    }

    private void initialize(MaxPollRecordsDistributor maxPollRecordsDistributor, Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        if (!properties.containsKey("max.priority")) {
            throw new IllegalArgumentException("Missing config max.priority");
        }
        this.maxPriority = Integer.parseInt(properties.getProperty("max.priority"));
        if (!properties.containsKey("max.poll.records")) {
            throw new IllegalArgumentException("Missing config max.poll.records");
        }
        int maxPollRecords = Integer.parseInt(properties.getProperty("max.poll.records"));
        if (!properties.containsKey("group.id")) {
            throw new IllegalArgumentException("Missing config group.id");
        }
        String groupId = properties.getProperty("group.id");
        int maxPollHistoryWindowSize = 6;
        if (properties.containsKey("max.poll.history.window.size")) {
            maxPollHistoryWindowSize = Integer.parseInt(properties.getProperty("max.poll.history.window.size"));
        }
        int minPollWindowMaxoutSize = 4;
        if (properties.containsKey("min.poll.window.maxout.threshold")) {
            minPollWindowMaxoutSize = Integer.parseInt(properties.getProperty("min.poll.window.maxout.threshold"));
        }
        this.maxPollRecordDistribution = maxPollRecordsDistributor.distribution(this.maxPriority, maxPollRecords);
        log.info("Using {} distribution {}", (Object)"max.poll.records", this.maxPollRecordDistribution);
        this.consumers = new HashMap<Integer, KafkaConsumer<K, V>>(this.maxPriority);
        this.consumerPollWindowHistory = new HashMap<Integer, Window>(this.maxPriority);
        for (int i = 0; i < this.maxPriority; ++i) {
            int consumerMaxPollRecords = this.maxPollRecordDistribution.get(i);
            String consumerGroupId = ClientUtils.getPriorityConsumerGroup(groupId, i);
            properties.put("max.poll.records", (Object)consumerMaxPollRecords);
            properties.put("group.id", consumerGroupId);
            this.consumers.put(i, new KafkaConsumer(properties, keyDeserializer, valueDeserializer));
            this.consumerPollWindowHistory.put(i, new Window(maxPollHistoryWindowSize, minPollWindowMaxoutSize, consumerMaxPollRecords));
        }
    }

    @Override
    public int getMaxPriority() {
        return this.maxPriority;
    }

    @Override
    public Set<TopicPartition> assignment() {
        HashSet<TopicPartition> assignments = new HashSet<TopicPartition>();
        for (KafkaConsumer<K, V> consumer : this.consumers.values()) {
            assignments.addAll(consumer.assignment());
        }
        return assignments;
    }

    @Override
    public Set<String> subscription() {
        HashSet<String> subscriptions = new HashSet<String>();
        for (KafkaConsumer<K, V> consumer : this.consumers.values()) {
            subscriptions.addAll(consumer.subscription());
        }
        return subscriptions;
    }

    @Override
    public void subscribe(Collection<String> topics) {
        this.subscribe(topics, (ConsumerRebalanceListener)new NoOpConsumerRebalanceListener());
    }

    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
        for (int i = 0; i < this.maxPriority; ++i) {
            ArrayList<String> priorityTopics = new ArrayList<String>(topics.size());
            for (String topic : topics) {
                priorityTopics.add(ClientUtils.getPriorityTopic(topic, i));
            }
            this.consumers.get(i).subscribe(priorityTopics, callback);
        }
    }

    private Map<Integer, Collection<TopicPartition>> splitPriorityPartitions(Collection<TopicPartition> partitions) {
        HashMap<Integer, Collection<TopicPartition>> priorityPartitions = new HashMap<Integer, Collection<TopicPartition>>();
        for (TopicPartition partition : partitions) {
            int priority = ClientUtils.getPriority(partition.topic());
            if (!priorityPartitions.containsKey(priority)) {
                priorityPartitions.put(priority, new ArrayList());
            }
            ((Collection)priorityPartitions.get(priority)).add(partition);
        }
        return priorityPartitions;
    }

    @Override
    public void unsubscribe() {
        for (KafkaConsumer<K, V> consumer : this.consumers.values()) {
            consumer.unsubscribe();
        }
    }

    @Override
    public void commitSync() {
        for (KafkaConsumer<K, V> consumer : this.consumers.values()) {
            consumer.commitSync();
        }
    }

    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        HashMap priorityOffsets = new HashMap();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            TopicPartition partition = entry.getKey();
            int priority = ClientUtils.getPriority(partition.topic());
            if (!priorityOffsets.containsKey(priority)) {
                priorityOffsets.put(priority, new HashMap());
            }
            ((Map)priorityOffsets.get(priority)).put(entry.getKey(), entry.getValue());
        }
        for (Map.Entry<Object, Object> entry : priorityOffsets.entrySet()) {
            this.consumers.get(entry.getKey()).commitSync((Map)entry.getValue());
        }
    }

    @Override
    public void commitAsync() {
        for (KafkaConsumer<K, V> consumer : this.consumers.values()) {
            consumer.commitAsync();
        }
    }

    @Override
    public void seek(TopicPartition partition, long offset) {
        int priority = ClientUtils.getPriority(partition.topic());
        this.consumers.get(priority).seek(partition, offset);
    }

    @Override
    public void seekToBeginning(Collection<TopicPartition> partitions) {
        Map<Integer, Collection<TopicPartition>> priorityPartitions = this.splitPriorityPartitions(partitions);
        for (Map.Entry<Integer, Collection<TopicPartition>> entry : priorityPartitions.entrySet()) {
            this.consumers.get(entry.getKey()).seekToBeginning(entry.getValue());
        }
    }

    @Override
    public void seekToEnd(Collection<TopicPartition> partitions) {
        Map<Integer, Collection<TopicPartition>> priorityPartitions = this.splitPriorityPartitions(partitions);
        for (Map.Entry<Integer, Collection<TopicPartition>> entry : priorityPartitions.entrySet()) {
            this.consumers.get(entry.getKey()).seekToEnd(entry.getValue());
        }
    }

    @Override
    public long position(TopicPartition partition) {
        int priority = ClientUtils.getPriority(partition.topic());
        return this.consumers.get(priority).position(partition);
    }

    @Override
    public OffsetAndMetadata committed(TopicPartition partition) {
        int priority = ClientUtils.getPriority(partition.topic());
        return this.consumers.get(priority).committed(partition);
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        HashMap metrics = new HashMap();
        for (KafkaConsumer<K, V> consumer : this.consumers.values()) {
            metrics.putAll(consumer.metrics());
        }
        return metrics;
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        ArrayList<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
        for (int i = 0; i < this.maxPriority; ++i) {
            partitions.addAll(this.consumers.get(i).partitionsFor(ClientUtils.getPriorityTopic(topic, i)));
        }
        return partitions;
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics() {
        HashMap<String, List<PartitionInfo>> topics = new HashMap<String, List<PartitionInfo>>();
        for (KafkaConsumer<K, V> consumer : this.consumers.values()) {
            topics.putAll(consumer.listTopics());
        }
        return topics;
    }

    @Override
    public Set<TopicPartition> paused() {
        HashSet<TopicPartition> paused = new HashSet<TopicPartition>();
        for (KafkaConsumer<K, V> consumer : this.consumers.values()) {
            paused.addAll(consumer.paused());
        }
        return paused;
    }

    @Override
    public void pause(Collection<TopicPartition> partitions) {
        Map<Integer, Collection<TopicPartition>> priorityPartitions = this.splitPriorityPartitions(partitions);
        for (Map.Entry<Integer, Collection<TopicPartition>> entry : priorityPartitions.entrySet()) {
            this.consumers.get(entry.getKey()).pause(entry.getValue());
        }
    }

    @Override
    public void resume(Collection<TopicPartition> partitions) {
        Map<Integer, Collection<TopicPartition>> priorityPartitions = this.splitPriorityPartitions(partitions);
        for (Map.Entry<Integer, Collection<TopicPartition>> entry : priorityPartitions.entrySet()) {
            this.consumers.get(entry.getKey()).resume(entry.getValue());
        }
    }

    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        HashMap priorityTimestampsToSearch = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
            int priority = ClientUtils.getPriority(entry.getKey().topic());
            if (!priorityTimestampsToSearch.containsKey(priority)) {
                priorityTimestampsToSearch.put(priority, new HashMap());
            }
            ((Map)priorityTimestampsToSearch.get(priority)).put(entry.getKey(), entry.getValue());
        }
        HashMap<TopicPartition, OffsetAndTimestamp> offsetsForTimes = new HashMap<TopicPartition, OffsetAndTimestamp>();
        for (Map.Entry entry : priorityTimestampsToSearch.entrySet()) {
            offsetsForTimes.putAll(this.consumers.get(entry.getKey()).offsetsForTimes((Map)entry.getValue()));
        }
        return offsetsForTimes;
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
        Map<Integer, Collection<TopicPartition>> priorityPartitions = this.splitPriorityPartitions(partitions);
        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<Integer, Collection<TopicPartition>> entry : priorityPartitions.entrySet()) {
            beginningOffsets.putAll(this.consumers.get(entry.getKey()).beginningOffsets(entry.getValue()));
        }
        return beginningOffsets;
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
        Map<Integer, Collection<TopicPartition>> priorityPartitions = this.splitPriorityPartitions(partitions);
        HashMap<TopicPartition, Long> endOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<Integer, Collection<TopicPartition>> entry : priorityPartitions.entrySet()) {
            endOffsets.putAll(this.consumers.get(entry.getKey()).endOffsets(entry.getValue()));
        }
        return endOffsets;
    }

    @Override
    public void close() {
        for (KafkaConsumer<K, V> consumer : this.consumers.values()) {
            consumer.close();
        }
    }

    @Override
    public void wakeup() {
        for (KafkaConsumer<K, V> consumer : this.consumers.values()) {
            consumer.wakeup();
        }
    }

    private boolean isEligibleToBurst(int priority) {
        Window window = this.consumerPollWindowHistory.get(priority);
        return window.isMaxedOutThresholdBreach();
    }

    private int burstCapacity(int priority) {
        int burst = 0;
        for (int i = 0; i < this.maxPriority; ++i) {
            if (i == priority) continue;
            Window window = this.consumerPollWindowHistory.get(i);
            burst += window.maxUnusedValue();
        }
        return burst;
    }

    void updateMaxPollRecords(KafkaConsumer<K, V> consumer, int maxPollRecords) {
        try {
            Field fetcherField = KafkaConsumer.class.getDeclaredField(FETCHER_FIELD);
            fetcherField.setAccessible(true);
            Fetcher fetcher = (Fetcher)fetcherField.get(consumer);
            Field maxPollRecordsField = Fetcher.class.getDeclaredField(MAX_POLL_RECORDS_FIELD);
            maxPollRecordsField.setAccessible(true);
            maxPollRecordsField.set(fetcher, maxPollRecords);
        }
        catch (Exception e) {
            throw new IllegalStateException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ConsumerRecords<K, V> poll(long pollTimeoutMs) {
        int priorityBurst = -1;
        try {
            int i;
            for (int i2 = this.maxPriority - 1; i2 >= 0; --i2) {
                if (!this.isEligibleToBurst(i2)) continue;
                int burstCapacity = this.burstCapacity(i2);
                if (burstCapacity <= 0) break;
                priorityBurst = i2;
                int finalCapacity = burstCapacity + this.maxPollRecordDistribution.get(i2);
                log.info("Burst in capacity for priority {} to {}", (Object)priorityBurst, (Object)finalCapacity);
                this.updateMaxPollRecords(this.consumers.get(priorityBurst), finalCapacity);
                break;
            }
            HashMap<TopicPartition, List> consumerRecords = new HashMap<TopicPartition, List>();
            HashMap<Integer, Integer> pollCounts = new HashMap<Integer, Integer>();
            for (int i3 = this.maxPriority - 1; i3 >= 0; --i3) {
                pollCounts.put(i3, 0);
            }
            long start = System.currentTimeMillis();
            do {
                for (i = this.maxPriority - 1; i >= 0; --i) {
                    ConsumerRecords records = this.consumers.get(i).poll(0L);
                    pollCounts.put(i, (Integer)pollCounts.get(i) + records.count());
                    for (TopicPartition partition : records.partitions()) {
                        consumerRecords.put(partition, records.records(partition));
                    }
                }
            } while (consumerRecords.isEmpty() && System.currentTimeMillis() < start + pollTimeoutMs);
            for (i = this.maxPriority - 1; i >= 0; --i) {
                this.consumerPollWindowHistory.get(i).add((Integer)pollCounts.get(i));
            }
            ConsumerRecords consumerRecords2 = new ConsumerRecords(consumerRecords);
            return consumerRecords2;
        }
        finally {
            if (priorityBurst >= 0) {
                this.updateMaxPollRecords(this.consumers.get(priorityBurst), this.maxPollRecordDistribution.get(priorityBurst));
            }
        }
    }
}

