/*
 * Decompiled with CFR 0.152.
 */
package flipkart.cp.convert.reservation.scheduler.worker.task;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import flipkart.cp.convert.chronosQ.core.SchedulerCheckpointer;
import flipkart.cp.convert.chronosQ.core.SchedulerEntry;
import flipkart.cp.convert.chronosQ.core.SchedulerSink;
import flipkart.cp.convert.chronosQ.core.SchedulerStore;
import flipkart.cp.convert.chronosQ.core.TimeBucket;
import flipkart.cp.convert.chronosQ.exceptions.SchedulerException;
import flipkart.cp.convert.reservation.scheduler.worker.task.WorkerTask;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerTaskImpl
extends WorkerTask {
    private final SchedulerCheckpointer checkpointer;
    private final SchedulerStore schedulerStore;
    private final TimeBucket timeBucket;
    private final SchedulerSink schedulerSink;
    private final MetricRegistry metricRegistry;
    private final String metricGaugeName;
    private final Timer sinkPushingTime;
    private final int batchSize;
    private boolean interrupt = false;
    static Logger log = LoggerFactory.getLogger((String)WorkerTaskImpl.class.getName());

    public WorkerTaskImpl(SchedulerCheckpointer checkpointer, SchedulerStore schedulerStore, TimeBucket timeBucket, SchedulerSink schedulerSink, String taskName, MetricRegistry metricRegistry, int batchSize) {
        super(taskName);
        this.checkpointer = checkpointer;
        this.schedulerStore = schedulerStore;
        this.timeBucket = timeBucket;
        this.schedulerSink = schedulerSink;
        this.metricRegistry = metricRegistry;
        this.batchSize = batchSize;
        log.info("WorkerTaskImpl: " + taskName);
        this.sinkPushingTime = metricRegistry.timer("sinkPushingTime-Partition " + this.getPartitionNum());
        this.metricGaugeName = "ElapsedTimeWorkerToProcess TimeDiffInMilliSec -Partition " + this.getPartitionNum();
        try {
            metricRegistry.register(MetricRegistry.name(WorkerTaskImpl.class, (String[])new String[]{this.metricGaugeName}), (Metric)new Gauge<Long>(){

                public Long getValue() {
                    try {
                        return WorkerTaskImpl.this.getCurrentDateTimeInMilliSecs() - WorkerTaskImpl.this.calculateNextIntervalForProcess(WorkerTaskImpl.this.getPartitionNum());
                    }
                    catch (SchedulerException e) {
                        log.error("Exception in initializing WorkerTaskImpl: ", (Throwable)e);
                        return Long.MIN_VALUE;
                    }
                }
            });
        }
        catch (IllegalArgumentException ex) {
            log.warn("Metrics already registered for this partition.Calling in restart gives failure.Ignoring");
        }
    }

    @Override
    public void process() {
        while (!this.interrupt && !Thread.currentThread().isInterrupted()) {
            try {
                long currentDateTimeInMilliSec = this.getCurrentDateTimeInMilliSecs();
                long nextIntervalForProcess = this.calculateNextIntervalForProcess(this.getPartitionNum());
                while (!this.interrupt && nextIntervalForProcess <= currentDateTimeInMilliSec) {
                    List values = this.schedulerStore.getNextN(nextIntervalForProcess, this.getPartitionNum().intValue(), this.batchSize);
                    while (!this.interrupt && !values.isEmpty()) {
                        Timer.Context context = this.sinkPushingTime.time();
                        this.schedulerSink.giveExpiredListForProcessing(values);
                        context.stop();
                        this.schedulerStore.removeBulk(nextIntervalForProcess, this.getPartitionNum().intValue(), values.stream().map(SchedulerEntry::getKey).collect(Collectors.toList()));
                        values = this.schedulerStore.getNextN(nextIntervalForProcess, this.getPartitionNum().intValue(), this.batchSize);
                    }
                    this.checkpointer.set(String.valueOf(nextIntervalForProcess), this.getPartitionNum().intValue());
                    log.info("Processed for " + nextIntervalForProcess + " in partition " + this.getPartitionNum());
                    nextIntervalForProcess = this.timeBucket.next(nextIntervalForProcess);
                    currentDateTimeInMilliSec = this.getCurrentDateTimeInMilliSecs();
                }
                log.info("sleep for " + (nextIntervalForProcess - currentDateTimeInMilliSec));
                Thread.sleep(nextIntervalForProcess - currentDateTimeInMilliSec);
            }
            catch (InterruptedException e) {
                log.error("Thread interrupted. Breaking and Restarting. ", (Throwable)e);
                break;
            }
            catch (Exception ex) {
                log.error("Exception in processing WorkerTaskImpl. Restarting ", (Throwable)ex);
            }
        }
    }

    public void interrupt() {
        this.interrupt = true;
    }

    public void stopGraceFully() {
        this.interrupt();
    }

    public void stopPoisonPill() {
        Thread.currentThread().stop();
    }

    private long calculateNextIntervalForProcess(int partitionNum) throws SchedulerException {
        long timerKeyConverted;
        try {
            String timerKey = this.checkpointer.peek(partitionNum);
            timerKeyConverted = Long.valueOf(timerKey);
        }
        catch (Exception ex) {
            timerKeyConverted = new Date().getTime() - 10000L;
            log.error("Checkpointer key is null, Creating key from current time -" + timerKeyConverted);
            this.checkpointer.set(String.valueOf(timerKeyConverted), partitionNum);
        }
        return this.timeBucket.toBucket(timerKeyConverted);
    }

    private long getCurrentDateTimeInMilliSecs() {
        return new Date().getTime();
    }
}

