/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.flux.redriver.service;

import com.codahale.metrics.InstrumentedScheduledExecutorService;
import com.codahale.metrics.SharedMetricRegistries;
import com.flipkart.flux.redriver.model.ScheduledMessage;
import com.flipkart.flux.redriver.service.MessageManagerService;
import com.flipkart.flux.task.redriver.RedriverRegistry;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Singleton
public class RedriverService {
    private static final Logger logger = LogManager.getLogger(RedriverService.class);
    private static final String scheduledExectorSvcName = "redriver-batch-read-executor-svc";
    private Integer batchReadInterval;
    private Integer batchSize;
    private Long initialDelay = 10000L;
    private MessageManagerService messageService;
    private ScheduledFuture scheduledFuture;
    private final RedriverRegistry redriverRegistry;
    private final InstrumentedScheduledExecutorService scheduledExecutorService;
    private ExecutorService asyncRedriveService;

    @Inject
    public RedriverService(MessageManagerService messageService, RedriverRegistry redriverRegistry, @Named(value="redriver.batchRead.intervalms") Integer batchReadInterval, @Named(value="redriver.batchRead.batchSize") Integer batchSize) {
        this.redriverRegistry = redriverRegistry;
        this.batchReadInterval = batchReadInterval;
        this.batchSize = batchSize;
        this.messageService = messageService;
        this.asyncRedriveService = Executors.newFixedThreadPool(10);
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
        executor.setRemoveOnCancelPolicy(true);
        this.scheduledExecutorService = new InstrumentedScheduledExecutorService(Executors.unconfigurableScheduledExecutorService(executor), SharedMetricRegistries.getOrCreate((String)"mainMetricRegistry"), scheduledExectorSvcName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        RedriverService redriverService = this;
        synchronized (redriverService) {
            if (this.scheduledFuture == null || this.scheduledFuture.isDone()) {
                this.scheduledFuture = this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
                    try {
                        this.redrive();
                    }
                    catch (Throwable e) {
                        logger.error("Error while running redrive {}", e);
                    }
                }, this.initialDelay.longValue(), (long)this.batchReadInterval.intValue(), TimeUnit.MILLISECONDS);
                logger.info("RedriverService started");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        RedriverService redriverService = this;
        synchronized (redriverService) {
            this.scheduledFuture.cancel(false);
            try {
                this.asyncRedriveService.awaitTermination(10000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                logger.error("Forcefully terminated redrive jobs {}", (Throwable)e);
            }
            logger.info("RedriverService stopped");
        }
    }

    public Boolean isRunning() {
        return this.scheduledFuture != null && !this.scheduledFuture.isDone();
    }

    private void redrive() {
        List<ScheduledMessage> messages;
        int offset = 0;
        ArrayList tasksRedrived = new ArrayList();
        do {
            messages = this.messageService.retrieveOldest(offset, this.batchSize);
            logger.info("Retrieved {} messages to redrive", (Object)messages.size());
            messages.forEach(e -> tasksRedrived.add(this.asyncRedriveService.submit(() -> {
                try {
                    this.redriverRegistry.redriveTask(e.getStateMachineId(), e.getTaskId(), Long.valueOf(e.getExecutionVersion()));
                }
                catch (Exception ex) {
                    logger.error("Something went wrong in redriving task:{} smId:{} with execution Version:{}, Error: {}", (Object)e.getTaskId(), (Object)e.getStateMachineId(), (Object)e.getExecutionVersion(), (Object)ex.getStackTrace());
                }
            })));
            offset += this.batchSize.intValue();
            boolean allCompleted = false;
            block3: while (!allCompleted) {
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException e2) {
                    logger.warn("Error while sleeping before checking async redrive callbacks {}", (Throwable)e2);
                }
                allCompleted = true;
                for (int i = 0; i < tasksRedrived.size(); ++i) {
                    if (((Future)tasksRedrived.get(i)).isDone() || ((Future)tasksRedrived.get(i)).isCancelled()) continue;
                    allCompleted = false;
                    continue block3;
                }
            }
            tasksRedrived.clear();
        } while (messages.size() > 0);
    }

    public void setInitialDelay(Long initialDelay) {
        this.initialDelay = initialDelay;
    }
}

