/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.aesop.runtime.bootstrap.consumer;

import com.flipkart.aesop.event.AbstractEvent;
import com.flipkart.aesop.eventconsumer.AbstractEventConsumer;
import com.flipkart.aesop.runtime.bootstrap.consumer.SourceEventConsumer;
import com.flipkart.aesop.runtime.bootstrap.consumer.SourceEventProcessor;
import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.trpr.platform.core.impl.logging.LogFactory;
import org.trpr.platform.core.spi.logging.Logger;

public class DefaultBlockingEventConsumer
implements SourceEventConsumer {
    public static final Logger LOGGER = LogFactory.getLogger(DefaultBlockingEventConsumer.class);
    private final String PRIMARY_KEY_SEPERATOR = ";";
    private List<ThreadPoolExecutor> executors = new ArrayList<ThreadPoolExecutor>();
    private final int numberOfPartition;
    private final AbstractEventConsumer eventConsumer;

    public DefaultBlockingEventConsumer(int numberOfPartition, int executorQueueSize, AbstractEventConsumer eventConsumer, RejectedExecutionHandler rejectedExecutionHandler) {
        this.eventConsumer = eventConsumer;
        this.numberOfPartition = Math.min(numberOfPartition, Runtime.getRuntime().availableProcessors());
        ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(executorQueueSize);
        LOGGER.info("numberOfPartition used: " + numberOfPartition);
        int i = 0;
        while (i < numberOfPartition) {
            this.executors.add(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, queue, rejectedExecutionHandler));
            ++i;
        }
    }

    @Override
    public void onEvent(AbstractEvent sourceEvent) {
        String primaryKeyValues = Joiner.on((String)";").join((Iterable)sourceEvent.getPrimaryKeyValues());
        Integer partitionNumber = (primaryKeyValues.hashCode() & Integer.MAX_VALUE) % this.numberOfPartition;
        LOGGER.debug("Partition:" + primaryKeyValues.hashCode() + ":" + partitionNumber);
        this.executors.get(partitionNumber).execute(new SourceEventProcessor(sourceEvent, this.eventConsumer));
    }

    @Override
    public void shutdown() {
        int i = 0;
        while (i < this.numberOfPartition) {
            this.executors.get(i).shutdown();
            ++i;
        }
        try {
            i = 0;
            while (i < this.numberOfPartition) {
                this.executors.get(i).awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
                ++i;
            }
        }
        catch (InterruptedException e) {
            LOGGER.error("Error while stopping bootstrap consumer", (Throwable)e);
        }
    }
}

