/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.priority.kafka.client.producer;

import com.flipkart.priority.kafka.client.ClientUtils;
import com.flipkart.priority.kafka.client.producer.AbstractPriorityKafkaProducer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.Serializer;

public class PriorityKafkaProducer<K, V>
extends AbstractPriorityKafkaProducer<K, V> {
    private int maxPriority;
    private KafkaProducer<K, V> producer;

    public PriorityKafkaProducer(Map<String, Object> configs) {
        this(configs, null, null);
    }

    public PriorityKafkaProducer(Map<String, Object> configs, Serializer<K> keyDeserializer, Serializer<V> valueDeserializer) {
        Properties properties = new Properties();
        for (Map.Entry<String, Object> entry : configs.entrySet()) {
            properties.setProperty(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
        }
        this.initialize(properties, keyDeserializer, valueDeserializer);
    }

    public PriorityKafkaProducer(Properties properties) {
        this(properties, (Serializer<K>)null, (Serializer<V>)null);
    }

    public PriorityKafkaProducer(Properties properties, Serializer<K> keyDeserializer, Serializer<V> valueDeserializer) {
        this.initialize(properties, keyDeserializer, valueDeserializer);
    }

    PriorityKafkaProducer(int maxPriority, KafkaProducer<K, V> producer) {
        this.maxPriority = maxPriority;
        this.producer = producer;
    }

    private void initialize(Properties properties, Serializer<K> keyDeserializer, Serializer<V> valueDeserializer) {
        if (!properties.containsKey("max.priority")) {
            throw new IllegalArgumentException("Missing config max.priority");
        }
        this.maxPriority = Integer.parseInt(properties.getProperty("max.priority"));
        this.producer = new KafkaProducer(properties, keyDeserializer, valueDeserializer);
    }

    @Override
    public int getMaxPriority() {
        return this.maxPriority;
    }

    private ProducerRecord<K, V> getPriorityRecord(int priority, ProducerRecord<K, V> record) {
        if (priority < 0 || priority >= this.maxPriority) {
            throw new IllegalArgumentException("Priority param must be in the range [0, " + (this.maxPriority - 1) + "]");
        }
        String priorityTopic = ClientUtils.getPriorityTopic(record.topic(), priority);
        return new ProducerRecord(priorityTopic, record.partition(), record.timestamp(), record.key(), record.value());
    }

    @Override
    public Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record) {
        return this.producer.send(this.getPriorityRecord(priority, record));
    }

    @Override
    public Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record, Callback callback) {
        return this.producer.send(this.getPriorityRecord(priority, record), callback);
    }

    @Override
    public void flush() {
        this.producer.flush();
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        ArrayList<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
        for (int i = 0; i < this.maxPriority; ++i) {
            partitions.addAll(this.producer.partitionsFor(ClientUtils.getPriorityTopic(topic, i)));
        }
        return partitions;
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return this.producer.metrics();
    }

    @Override
    public void close() {
        this.producer.close();
    }

    @Override
    public void close(long timeout, TimeUnit unit) {
        this.producer.close(timeout, unit);
    }
}

