/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.aesop.runtime.bootstrap.consumer;

import com.flipkart.aesop.event.AbstractEvent;
import com.flipkart.aesop.eventconsumer.AbstractEventConsumer;
import com.flipkart.aesop.runtime.bootstrap.consumer.SourceEventConsumer;
import com.flipkart.aesop.runtime.bootstrap.consumer.SourceEventProcessor;
import com.google.common.base.Joiner;
import com.linkedin.databus2.core.BackoffTimer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.InitializingBean;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

public class DefaultBlockingEventConsumer
implements SourceEventConsumer,
InitializingBean {
    public static final Logger LOGGER = LogFactory.getLogger(DefaultBlockingEventConsumer.class);
    private static final long DEFAULT_THREAD_AWAIT_TERMINATION_TIME_IN_SECS = 60L;
    private final String PRIMARY_KEY_SEPERATOR = ";";
    private List<ThreadPoolExecutor> executors = new ArrayList<ThreadPoolExecutor>();
    private int numberOfPartition;
    private AbstractEventConsumer eventConsumer;
    private BackoffTimer backoffTimer;
    private long threadTerminationMaxTime = 60L;
    private int executorQueueSize;
    private RejectedExecutionHandler rejectedExecutionHandler;

    @Override
    public void onEvent(AbstractEvent sourceEvent) {
        String primaryKeyValues = Joiner.on((String)";").join((Iterable)sourceEvent.getPrimaryKeyValues());
        Integer partitionNumber = (primaryKeyValues.hashCode() & Integer.MAX_VALUE) % this.numberOfPartition;
        LOGGER.debug("Partition:" + primaryKeyValues.hashCode() + ":" + partitionNumber);
        this.executors.get(partitionNumber).execute(new SourceEventProcessor(sourceEvent, this.eventConsumer, this.backoffTimer));
    }

    @Override
    public void shutdown() {
        int i = 0;
        while (i < this.numberOfPartition) {
            this.executors.get(i).shutdown();
            ++i;
        }
        try {
            i = 0;
            while (i < this.numberOfPartition) {
                this.executors.get(i).awaitTermination(this.threadTerminationMaxTime, TimeUnit.NANOSECONDS);
                ++i;
            }
        }
        catch (InterruptedException e) {
            LOGGER.error("Error while stopping bootstrap consumer", (Throwable)e);
        }
    }

    public void afterPropertiesSet() throws Exception {
        this.numberOfPartition = Math.min(this.numberOfPartition, Runtime.getRuntime().availableProcessors());
        LOGGER.info("numberOfPartition used: " + this.numberOfPartition);
        int i = 0;
        while (i < this.numberOfPartition) {
            ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(this.executorQueueSize);
            this.executors.add(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue, this.rejectedExecutionHandler));
            ++i;
        }
    }

    public AbstractEventConsumer getEventConsumer() {
        return this.eventConsumer;
    }

    public void setEventConsumer(AbstractEventConsumer eventConsumer) {
        this.eventConsumer = eventConsumer;
    }

    public BackoffTimer getBackoffTimer() {
        return this.backoffTimer;
    }

    public void setBackoffTimer(BackoffTimer backoffTimer) {
        this.backoffTimer = backoffTimer;
    }

    public long getThreadTerminationMaxTime() {
        return this.threadTerminationMaxTime;
    }

    public void setThreadTerminationMaxTime(long threadTerminationMaxTime) {
        this.threadTerminationMaxTime = threadTerminationMaxTime;
    }

    public int getExecutorQueueSize() {
        return this.executorQueueSize;
    }

    public void setExecutorQueueSize(int executorQueueSize) {
        this.executorQueueSize = executorQueueSize;
    }

    public RejectedExecutionHandler getRejectedExecutionHandler() {
        return this.rejectedExecutionHandler;
    }

    public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
        this.rejectedExecutionHandler = rejectedExecutionHandler;
    }

    public int getNumberOfPartition() {
        return this.numberOfPartition;
    }

    public void setNumberOfPartition(int numberOfPartition) {
        this.numberOfPartition = numberOfPartition;
    }
}

