/*
 * Decompiled with CFR 0.152.
 */
package com.olacabs.fabric.compute.pipeline;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.codahale.metrics.annotation.Timed;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Joiner;
import com.olacabs.fabric.common.util.metrics.MetricConstants;
import com.olacabs.fabric.common.util.metrics.MetricFactory;
import com.olacabs.fabric.compute.EventCollector;
import com.olacabs.fabric.compute.ProcessingContext;
import com.olacabs.fabric.compute.comms.CommsMessageHandler;
import com.olacabs.fabric.compute.pipeline.ClockPulseGenerator;
import com.olacabs.fabric.compute.pipeline.CommsIdGenerator;
import com.olacabs.fabric.compute.pipeline.MessageSource;
import com.olacabs.fabric.compute.pipeline.NotificationBus;
import com.olacabs.fabric.compute.pipeline.PipelineMessage;
import com.olacabs.fabric.compute.pipeline.SourceIdBasedTransactionIdGenerator;
import com.olacabs.fabric.compute.pipeline.TransactionIdGenerator;
import com.olacabs.fabric.compute.processor.InitializationException;
import com.olacabs.fabric.compute.processor.ProcessingException;
import com.olacabs.fabric.compute.processor.ProcessorBase;
import com.olacabs.fabric.compute.util.ComponentPropertyReader;
import com.olacabs.fabric.model.common.ComponentMetadata;
import com.olacabs.fabric.model.event.Event;
import com.olacabs.fabric.model.event.EventSet;
import io.astefanutti.metrics.aspectj.AnnotatedMetric;
import io.astefanutti.metrics.aspectj.MetricAspect;
import io.astefanutti.metrics.aspectj.MetricStaticAspect;
import io.astefanutti.metrics.aspectj.Metrics;
import io.astefanutti.metrics.aspectj.Profiled;
import io.astefanutti.metrics.aspectj.TimedAspect;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.reflect.MethodSignature;
import org.aspectj.runtime.internal.AroundClosure;
import org.aspectj.runtime.reflect.Factory;
import org.jboss.logging.MDC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Metrics
public class PipelineStage
implements CommsMessageHandler<PipelineMessage>,
MessageSource,
Profiled {
    private static final Logger log;
    private final String instanceId;
    private final Properties properties;
    private final int id;
    private final TransactionIdGenerator idGenerator;
    private final ComponentMetadata processorMetadata;
    private final ProcessorBase processor;
    private final NotificationBus notificationBus;
    private final ProcessingContext context;
    private final Retryer<PipelineMessage> retryer;
    private ClockPulseGenerator clockPulseGenerator;
    public Map<String, AnnotatedMetric<Gauge>> gauges;
    public Map<String, AnnotatedMetric<Meter>> meters;
    public Map<String, AnnotatedMetric<Timer>> timers;
    private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
    private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;

    public PipelineStage(String instanceId, Properties properties, ComponentMetadata processorMetadata, ProcessorBase processor, NotificationBus notificationBus, ProcessingContext context) {
        MetricAspect.ajc$interFieldInit$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$gauges((Profiled)this);
        MetricAspect.ajc$interFieldInit$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$meters((Profiled)this);
        MetricAspect.ajc$interFieldInit$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$timers((Profiled)this);
        try {
            this.id = CommsIdGenerator.nextId();
            this.idGenerator = new SourceIdBasedTransactionIdGenerator(this);
            this.retryer = RetryerBuilder.newBuilder().retryIfException().retryIfRuntimeException().withWaitStrategy(WaitStrategies.fibonacciWait((long)30L, (TimeUnit)TimeUnit.SECONDS)).build();
            this.instanceId = instanceId;
            this.properties = properties;
            this.processorMetadata = processorMetadata;
            this.processor = processor;
            this.notificationBus = notificationBus;
            this.context = context;
        }
        finally {
            MetricAspect.aspectOf().ajc$after$io_astefanutti_metrics_aspectj_MetricAspect$1$c735687d((Profiled)this);
        }
    }

    @Override
    public int communicationId() {
        return this.id;
    }

    public void initialize(Properties globalProperties) throws InitializationException {
        Long triggeringFrequency = ComponentPropertyReader.readLong(this.properties, globalProperties, "triggering_frequency", this.instanceId, this.getProcessorMetadata());
        if (null != triggeringFrequency) {
            this.clockPulseGenerator = ClockPulseGenerator.builder().id(this.id).notificationBus(this.notificationBus).notificationPeriod(triggeringFrequency).build();
        }
        this.processor.initialize(this.instanceId, globalProperties, this.properties, this.processorMetadata);
    }

    @Override
    public boolean sendsNormalMessage() {
        return !this.processor.isScheduled();
    }

    public void start() {
        if (null != this.clockPulseGenerator) {
            this.clockPulseGenerator.start();
        }
    }

    public void stop() {
        if (null != this.clockPulseGenerator) {
            this.clockPulseGenerator.stop();
        }
        this.processor.destroy();
    }

    @Override
    public String name() {
        return this.processorMetadata.getName();
    }

    @Override
    public void handlePipelineMessage(PipelineMessage pipelineMessage) throws Exception {
        switch (pipelineMessage.getMessageType()) {
            case TIMER: {
                this.handleTimerMessage(pipelineMessage);
                break;
            }
            case USERSPACE: {
                this.handleUserMessage(pipelineMessage);
                break;
            }
        }
    }

    private void handleTimerMessage(PipelineMessage pipelineMessage) throws ProcessingException {
        try {
            MDC.put((String)"componentId", (Object)this.processor.getId());
            this.retryer.call(() -> {
                try {
                    List<Event> events = this.processor.timeTriggerHandler(this.context);
                    EventSet eventSet = EventSet.eventFromEventBuilder().isAggregate(true).events(events).build();
                    eventSet.setId(this.idGenerator.transactionId());
                    this.notificationBus.publish(PipelineMessage.userspaceMessageBuilder().messages(eventSet).build(), this.id);
                    log.debug("Scheduled processing completed.");
                    return null;
                }
                catch (Throwable t) {
                    log.error("<timeTriggerHandler()> threw exception: ", t);
                    throw t;
                }
            });
        }
        catch (Exception e) {
            if (e.getCause() != null) {
                log.error("Error executing <timeTriggerHandler()>", e.getCause());
            } else {
                log.error("Error executing <timeTriggerHandler()>", (Throwable)e);
            }
        }
        finally {
            MDC.remove((String)"componentId");
        }
    }

    @Timed(name="${this.context.topologyName}.${this.processorMetadata.id}")
    private void handleUserMessage(PipelineMessage pipelineMessage) throws ProcessingException {
        PipelineMessage pipelineMessage2 = pipelineMessage;
        PipelineStage.handleUserMessage_aroundBody1$advice(this, pipelineMessage2, TimedAspect.aspectOf(), this, null, ajc$tjp_0);
    }

    public boolean healthcheck() {
        return this.processor.healthcheck();
    }

    public static PipelineStageBuilder builder() {
        return new PipelineStageBuilder();
    }

    public String getInstanceId() {
        return this.instanceId;
    }

    public Properties getProperties() {
        return this.properties;
    }

    public ComponentMetadata getProcessorMetadata() {
        return this.processorMetadata;
    }

    public ProcessingContext getContext() {
        return this.context;
    }

    static {
        PipelineStage.ajc$preClinit();
        try {
            log = LoggerFactory.getLogger(PipelineStage.class);
        }
        catch (Throwable throwable) {
            if (throwable instanceof ExceptionInInitializerError) {
                throw (ExceptionInInitializerError)throwable;
            }
            MetricStaticAspect.aspectOf().ajc$after$io_astefanutti_metrics_aspectj_MetricStaticAspect$1$be47261c(ajc$tjp_1);
            throw throwable;
        }
        MetricStaticAspect.aspectOf().ajc$after$io_astefanutti_metrics_aspectj_MetricStaticAspect$1$be47261c(ajc$tjp_1);
    }

    public /* synthetic */ Map ajc$interFieldGet$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$gauges() {
        return this.gauges;
    }

    public /* synthetic */ void ajc$interFieldSet$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$gauges(Map map) {
        this.gauges = map;
    }

    public /* synthetic */ Map ajc$interFieldGet$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$meters() {
        return this.meters;
    }

    public /* synthetic */ void ajc$interFieldSet$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$meters(Map map) {
        this.meters = map;
    }

    public /* synthetic */ Map ajc$interFieldGet$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$timers() {
        return this.timers;
    }

    public /* synthetic */ void ajc$interFieldSet$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$timers(Map map) {
        this.timers = map;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static final /* synthetic */ void handleUserMessage_aroundBody0(PipelineStage ajc$this, PipelineMessage pipelineMessage) {
        PipelineMessage messageToSend = pipelineMessage;
        EventCollector eventCollector = new EventCollector();
        try {
            MDC.put((String)"componentId", (Object)ajc$this.processor.getId());
            PipelineMessage generatedMessage = (PipelineMessage)ajc$this.retryer.call(() -> {
                try {
                    this.processor.process(this.context, eventCollector, pipelineMessage.getMessages());
                }
                catch (Throwable t) {
                    log.error("<consume()> threw exception: ", t);
                    MetricFactory.getMetricRegistry().meter(Joiner.on((String)".").join((Object)MetricConstants.getProcessorExceptionPrefix(), (Object)this.context.getTopologyName(), new Object[]{this.processor.getId()})).mark();
                    throw t;
                }
                log.debug("[{}] Processing completed for message.", (Object)pipelineMessage.getMessages().getId());
                if (null != eventCollector.getEvents() && pipelineMessage.getMessages().getId() != eventCollector.getEvents().getId()) {
                    eventCollector.getEvents().setId(this.idGenerator.transactionId());
                    eventCollector.getEvents().setTransactionId(pipelineMessage.getMessages().getTransactionId());
                    return PipelineMessage.userspaceMessageBuilder().messages(eventCollector.getEvents()).parent(pipelineMessage).build();
                }
                return null;
            });
            if (null != generatedMessage) {
                messageToSend = generatedMessage;
                log.debug("[{}] Setting message to newly generated message: {}", (Object)pipelineMessage.getMessages().getId(), (Object)generatedMessage.getMessages().getId());
            }
            ajc$this.notificationBus.publish(messageToSend, ajc$this.id, !ajc$this.processor.isScheduled());
        }
        catch (Exception e) {
            try {
                if (e.getCause() != null) {
                    log.error(String.format("[%d] error executing handleUserMessage()", pipelineMessage.getMessages().getId()), e.getCause());
                } else {
                    log.error(String.format("[%d] error executing handleUserMessage()", pipelineMessage.getMessages().getId()), (Throwable)e);
                }
                ajc$this.notificationBus.publish(messageToSend, ajc$this.id, !ajc$this.processor.isScheduled());
            }
            catch (Throwable throwable) {
                ajc$this.notificationBus.publish(messageToSend, ajc$this.id, !ajc$this.processor.isScheduled());
                MDC.remove((String)"componentId");
                throw throwable;
            }
            MDC.remove((String)"componentId");
        }
        MDC.remove((String)"componentId");
    }

    private static final /* synthetic */ Object handleUserMessage_aroundBody1$advice(PipelineStage ajc$this, PipelineMessage pipelineMessage, TimedAspect ajc$aspectInstance, Profiled object, AroundClosure ajc$aroundClosure, JoinPoint.StaticPart thisJoinPointStaticPart) {
        String methodSignature = ((MethodSignature)thisJoinPointStaticPart.getSignature()).getMethod().toString();
        Timer timer = (Timer)((AnnotatedMetric)MetricAspect.ajc$interFieldGetDispatch$io_astefanutti_metrics_aspectj_MetricAspect$io_astefanutti_metrics_aspectj_Profiled$timers((Profiled)object).get(methodSignature)).getMetric();
        Timer.Context context = timer.time();
        try {
            AroundClosure aroundClosure = ajc$aroundClosure;
            Profiled profiled = object;
            PipelineStage.handleUserMessage_aroundBody0((PipelineStage)profiled, pipelineMessage);
            Object var10_11 = null;
            return var10_11;
        }
        finally {
            context.stop();
        }
    }

    private static /* synthetic */ void ajc$preClinit() {
        Factory factory = new Factory("PipelineStage.java", PipelineStage.class);
        ajc$tjp_0 = factory.makeSJP("method-execution", (Signature)factory.makeMethodSig("2", "handleUserMessage", "com.olacabs.fabric.compute.pipeline.PipelineStage", "com.olacabs.fabric.compute.pipeline.PipelineMessage", "pipelineMessage", "com.olacabs.fabric.compute.processor.ProcessingException", "void"), 184);
        ajc$tjp_1 = factory.makeSJP("staticinitialization", (Signature)factory.makeInitializerSig("8", "com.olacabs.fabric.compute.pipeline.PipelineStage"), 50);
    }

    public static class PipelineStageBuilder {
        private String instanceId;
        private Properties properties;
        private ComponentMetadata processorMetadata;
        private ProcessorBase processor;
        private NotificationBus notificationBus;
        private ProcessingContext context;

        PipelineStageBuilder() {
        }

        public PipelineStageBuilder instanceId(String instanceId) {
            this.instanceId = instanceId;
            return this;
        }

        public PipelineStageBuilder properties(Properties properties) {
            this.properties = properties;
            return this;
        }

        public PipelineStageBuilder processorMetadata(ComponentMetadata processorMetadata) {
            this.processorMetadata = processorMetadata;
            return this;
        }

        public PipelineStageBuilder processor(ProcessorBase processor) {
            this.processor = processor;
            return this;
        }

        public PipelineStageBuilder notificationBus(NotificationBus notificationBus) {
            this.notificationBus = notificationBus;
            return this;
        }

        public PipelineStageBuilder context(ProcessingContext context) {
            this.context = context;
            return this;
        }

        public PipelineStage build() {
            return new PipelineStage(this.instanceId, this.properties, this.processorMetadata, this.processor, this.notificationBus, this.context);
        }

        public String toString() {
            return "PipelineStage.PipelineStageBuilder(instanceId=" + this.instanceId + ", properties=" + this.properties + ", processorMetadata=" + this.processorMetadata + ", processor=" + this.processor + ", notificationBus=" + this.notificationBus + ", context=" + this.context + ")";
        }
    }
}

