/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializableObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class FlinkKafkaProducerBase<IN>
extends RichSinkFunction<IN>
implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
    private static final long serialVersionUID = 1L;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    protected final Properties producerConfig;
    protected final String defaultTopicId;
    protected final KeyedSerializationSchema<IN> schema;
    protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
    protected final Map<String, int[]> topicPartitionsMap;
    protected boolean logFailuresOnly;
    protected boolean flushOnCheckpoint = true;
    protected transient KafkaProducer<byte[], byte[]> producer;
    protected transient Callback callback;
    protected volatile transient Exception asyncException;
    protected final SerializableObject pendingRecordsLock = new SerializableObject();
    protected long pendingRecords;

    public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
        Objects.requireNonNull(defaultTopicId, "TopicID not set");
        Objects.requireNonNull(serializationSchema, "serializationSchema not set");
        Objects.requireNonNull(producerConfig, "producerConfig not set");
        ClosureCleaner.clean(customPartitioner, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
        ClosureCleaner.ensureSerializable(serializationSchema);
        this.defaultTopicId = defaultTopicId;
        this.schema = serializationSchema;
        this.producerConfig = producerConfig;
        this.flinkKafkaPartitioner = customPartitioner;
        if (!producerConfig.containsKey("key.serializer")) {
            this.producerConfig.put("key.serializer", ByteArraySerializer.class.getName());
        } else {
            LOG.warn("Overwriting the '{}' is not recommended", (Object)"key.serializer");
        }
        if (!producerConfig.containsKey("value.serializer")) {
            this.producerConfig.put("value.serializer", ByteArraySerializer.class.getName());
        } else {
            LOG.warn("Overwriting the '{}' is not recommended", (Object)"value.serializer");
        }
        if (!this.producerConfig.containsKey("bootstrap.servers")) {
            throw new IllegalArgumentException("bootstrap.servers must be supplied in the producer config properties.");
        }
        this.topicPartitionsMap = new HashMap<String, int[]>();
    }

    public void setLogFailuresOnly(boolean logFailuresOnly) {
        this.logFailuresOnly = logFailuresOnly;
    }

    public void setFlushOnCheckpoint(boolean flush) {
        this.flushOnCheckpoint = flush;
    }

    @VisibleForTesting
    protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
        return new KafkaProducer(props);
    }

    public void open(Configuration configuration) {
        this.producer = this.getKafkaProducer(this.producerConfig);
        RuntimeContext ctx = this.getRuntimeContext();
        if (null != this.flinkKafkaPartitioner) {
            if (this.flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
                ((FlinkKafkaDelegatePartitioner)this.flinkKafkaPartitioner).setPartitions(FlinkKafkaProducerBase.getPartitionsByTopic(this.defaultTopicId, this.producer));
            }
            this.flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
        }
        LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}", new Object[]{ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), this.defaultTopicId});
        if (!Boolean.parseBoolean(this.producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
            Map<MetricName, Metric> metrics = this.producer.metrics();
            if (metrics == null) {
                LOG.info("Producer implementation does not support metrics");
            } else {
                MetricGroup kafkaMetricGroup = this.getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
                for (Map.Entry<MetricName, Metric> metric : metrics.entrySet()) {
                    kafkaMetricGroup.gauge(metric.getKey().name(), (Gauge)new KafkaMetricWrapper(metric.getValue()));
                }
            }
        }
        if (this.flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
            LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
            this.flushOnCheckpoint = false;
        }
        this.callback = this.logFailuresOnly ? new Callback(){

            @Override
            public void onCompletion(RecordMetadata metadata, Exception e) {
                if (e != null) {
                    LOG.error("Error while sending record to Kafka: " + e.getMessage(), (Throwable)e);
                }
                FlinkKafkaProducerBase.this.acknowledgeMessage();
            }
        } : new Callback(){

            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null && FlinkKafkaProducerBase.this.asyncException == null) {
                    FlinkKafkaProducerBase.this.asyncException = exception;
                }
                FlinkKafkaProducerBase.this.acknowledgeMessage();
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void invoke(IN next, SinkFunction.Context context) throws Exception {
        int[] partitions;
        this.checkErroneous();
        byte[] serializedKey = this.schema.serializeKey(next);
        byte[] serializedValue = this.schema.serializeValue(next);
        String targetTopic = this.schema.getTargetTopic(next);
        if (targetTopic == null) {
            targetTopic = this.defaultTopicId;
        }
        if (null == (partitions = this.topicPartitionsMap.get(targetTopic))) {
            partitions = FlinkKafkaProducerBase.getPartitionsByTopic(targetTopic, this.producer);
            this.topicPartitionsMap.put(targetTopic, partitions);
        }
        ProducerRecord<byte[], byte[]> record = this.flinkKafkaPartitioner == null ? new ProducerRecord<byte[], byte[]>(targetTopic, serializedKey, serializedValue) : new ProducerRecord<byte[], byte[]>(targetTopic, this.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), serializedKey, serializedValue);
        if (this.flushOnCheckpoint) {
            SerializableObject serializableObject = this.pendingRecordsLock;
            synchronized (serializableObject) {
                ++this.pendingRecords;
            }
        }
        this.producer.send(record, this.callback);
    }

    public void close() throws Exception {
        if (this.producer != null) {
            this.producer.close();
        }
        this.checkErroneous();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void acknowledgeMessage() {
        if (this.flushOnCheckpoint) {
            SerializableObject serializableObject = this.pendingRecordsLock;
            synchronized (serializableObject) {
                --this.pendingRecords;
                if (this.pendingRecords == 0L) {
                    this.pendingRecordsLock.notifyAll();
                }
            }
        }
    }

    protected abstract void flush();

    public void initializeState(FunctionInitializationContext context) throws Exception {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
        this.checkErroneous();
        if (this.flushOnCheckpoint) {
            this.flush();
            SerializableObject serializableObject = this.pendingRecordsLock;
            synchronized (serializableObject) {
                if (this.pendingRecords != 0L) {
                    throw new IllegalStateException("Pending record count must be zero at this point: " + this.pendingRecords);
                }
                this.checkErroneous();
            }
        }
    }

    protected void checkErroneous() throws Exception {
        Exception e = this.asyncException;
        if (e != null) {
            this.asyncException = null;
            throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
        }
    }

    public static Properties getPropertiesFromBrokerList(String brokerList) {
        String[] elements;
        for (String broker : elements = brokerList.split(",")) {
            NetUtils.getCorrectHostnamePort((String)broker);
        }
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", brokerList);
        return props;
    }

    protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
        ArrayList<PartitionInfo> partitionsList = new ArrayList<PartitionInfo>(producer.partitionsFor(topic));
        Collections.sort(partitionsList, new Comparator<PartitionInfo>(){

            @Override
            public int compare(PartitionInfo o1, PartitionInfo o2) {
                return Integer.compare(o1.partition(), o2.partition());
            }
        });
        int[] partitions = new int[partitionsList.size()];
        for (int i = 0; i < partitions.length; ++i) {
            partitions[i] = ((PartitionInfo)partitionsList.get(i)).partition();
        }
        return partitions;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    protected long numPendingRecords() {
        SerializableObject serializableObject = this.pendingRecordsLock;
        synchronized (serializableObject) {
            return this.pendingRecords;
        }
    }
}

