/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.aesop.processor.kafka.processor;

import com.flipkart.aesop.event.AbstractEvent;
import com.flipkart.aesop.processor.DestinationEventProcessor;
import com.flipkart.aesop.processor.kafka.client.KafkaClient;
import com.flipkart.aesop.processor.kafka.preprocessor.KafkaEventDefaultPreprocessor;
import com.linkedin.databus.client.pub.ConsumerCallbackResult;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

public class SyncKafkaEventProcessor
implements DestinationEventProcessor {
    private static final Logger LOGGER = LogFactory.getLogger(SyncKafkaEventProcessor.class);
    private KafkaEventDefaultPreprocessor kafkaEventDefaultPreprocessor;

    public ConsumerCallbackResult processDestinationEvent(AbstractEvent event) {
        LOGGER.info("Received Abstract Event. Event is " + event);
        try {
            ProducerRecord record = this.kafkaEventDefaultPreprocessor.createProducerRecord(event);
            KafkaClient kafkaClient = this.kafkaEventDefaultPreprocessor.getKafkaClient();
            RecordMetadata response = (RecordMetadata)kafkaClient.getClient().send(record).get();
            if (response == null) {
                LOGGER.error("Kafka Send Error : NULL");
                return ConsumerCallbackResult.ERROR;
            }
            LOGGER.info("Send successful :  topic :: partition - " + response.topic() + "::" + response.partition());
            return ConsumerCallbackResult.SUCCESS;
        }
        catch (Exception e) {
            LOGGER.error("Kafka Server Connection Lost/Send Error" + e);
            return ConsumerCallbackResult.ERROR;
        }
    }

    public KafkaEventDefaultPreprocessor getKafkaEventDefaultPreprocessor() {
        return this.kafkaEventDefaultPreprocessor;
    }

    public void setKafkaEventDefaultPreprocessor(KafkaEventDefaultPreprocessor kafkaEventDefaultPreprocessor) {
        this.kafkaEventDefaultPreprocessor = kafkaEventDefaultPreprocessor;
    }
}

