/*
 * Decompiled with CFR 0.152.
 */
package flipkart.cp.convert.chronosQ.impl.kafka;

import flipkart.cp.convert.chronosQ.core.SchedulerEntry;
import flipkart.cp.convert.chronosQ.core.SchedulerSink;
import flipkart.cp.convert.chronosQ.exceptions.ErrorCode;
import flipkart.cp.convert.chronosQ.exceptions.SchedulerException;
import flipkart.cp.convert.chronosQ.impl.kafka.KafkaMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSchedulerSink
implements SchedulerSink {
    private static final Logger log = LoggerFactory.getLogger(KafkaSchedulerSink.class);
    private final String topic;
    private final KafkaMessage kafkaMessage;
    private Producer<byte[], byte[]> producer;

    public KafkaSchedulerSink(Properties properties, String topic, KafkaMessage kafkaMessage) {
        this.topic = topic;
        this.producer = new KafkaProducer(properties);
        this.kafkaMessage = kafkaMessage;
    }

    public CompletableFuture<RecordMetadata> giveExpiredForProcessing(SchedulerEntry schedulerEntry) throws SchedulerException {
        CompletableFuture<RecordMetadata> completableFuture = new CompletableFuture<RecordMetadata>();
        if (null != schedulerEntry) {
            try {
                log.debug("Pushing to kafka message: {}", (Object)schedulerEntry);
                this.producer.send(this.kafkaMessage.getKeyedMessage(this.topic, schedulerEntry), (metadata, exception) -> {
                    if (exception != null) {
                        completableFuture.completeExceptionally(exception);
                    } else {
                        completableFuture.complete(metadata);
                    }
                });
                return completableFuture;
            }
            catch (Exception e) {
                log.error("Exception occurred for value " + schedulerEntry + "-" + e.fillInStackTrace());
                throw new SchedulerException((Throwable)e, ErrorCode.SCHEDULER_SINK_ERROR);
            }
        }
        completableFuture.completeExceptionally(new Exception("Value is null"));
        return completableFuture;
    }

    public Future<List<RecordMetadata>> giveExpiredListForProcessing(List<SchedulerEntry> schedulerEntries) throws SchedulerException {
        ArrayList<ProducerRecord<byte[], byte[]>> data = new ArrayList<ProducerRecord<byte[], byte[]>>();
        for (SchedulerEntry storeValue : schedulerEntries) {
            if (null == storeValue) continue;
            data.add(this.kafkaMessage.getKeyedMessage(this.topic, storeValue));
        }
        try {
            List results = data.stream().map(elem -> {
                CompletableFuture completableFuture = new CompletableFuture();
                this.producer.send(elem, (metadata, exception) -> {
                    if (exception != null) {
                        completableFuture.completeExceptionally(exception);
                    } else {
                        completableFuture.complete(metadata);
                    }
                });
                return completableFuture;
            }).collect(Collectors.toList());
            return KafkaSchedulerSink.sequence(results);
        }
        catch (Throwable t) {
            log.error("Exception in {}, giveExpiredListForProcessing", (Object)t.getClass().getSimpleName());
            throw new SchedulerException(t, ErrorCode.SCHEDULER_SINK_ERROR);
        }
    }

    static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
        return CompletableFuture.allOf(com.toArray(new CompletableFuture[com.size()])).thenApply(v -> com.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    }
}

