/*
 * Decompiled with CFR 0.152.
 */
package com.flipkart.flux.dao;

import com.flipkart.flux.dao.StatesDAOImpl;
import com.flipkart.flux.dao.iface.StateMachinesDAO;
import com.flipkart.flux.dao.iface.StatesDAO;
import com.flipkart.flux.domain.StateMachine;
import com.flipkart.flux.domain.Status;
import com.flipkart.flux.shard.ShardId;
import com.flipkart.flux.shard.ShardPairModel;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Singleton
public class ParallelScatterGatherQueryHelper {
    private final StatesDAO statesDAO;
    private final StateMachinesDAO stateMachinesDAO;
    private final Map<ShardId, ShardPairModel> fluxShardIdToShardPairModelMap;
    private final ExecutorService executorService;
    private static final Logger logger = LogManager.getLogger(StatesDAOImpl.class);

    @Inject
    public ParallelScatterGatherQueryHelper(StatesDAO statesDAO, StateMachinesDAO stateMachinesDAO, @Named(value="fluxShardIdToShardPairMap") Map<ShardId, ShardPairModel> fluxShardKeyToShardIdMap) {
        this.statesDAO = statesDAO;
        this.stateMachinesDAO = stateMachinesDAO;
        this.fluxShardIdToShardPairModelMap = fluxShardKeyToShardIdMap;
        this.executorService = Executors.newFixedThreadPool(10);
    }

    public List findErroredStates(String stateMachineName, Timestamp fromTime, Timestamp toTime) {
        List result = Collections.synchronizedList(new ArrayList());
        this.scatterGatherQueryHelper(shardId -> this.statesDAO.findErroredStates((ShardId)shardId, stateMachineName, fromTime, toTime), result, "errored states");
        return result;
    }

    public List findStatesByStatus(String stateMachineName, Timestamp fromTime, Timestamp toTime, String taskName, List<Status> statuses) {
        List result = Collections.synchronizedList(new ArrayList());
        this.scatterGatherQueryHelper(shardId -> this.statesDAO.findStatesByStatus((ShardId)shardId, stateMachineName, fromTime, toTime, taskName, statuses), result, "states by status");
        return result;
    }

    public Set<StateMachine> findStateMachinesByName(String stateMachineName) {
        Set<StateMachine> result = Collections.synchronizedSet(new HashSet());
        this.scatterGatherQueryHelper(shardId -> this.stateMachinesDAO.findByName((ShardId)shardId, stateMachineName), result, "stateMachines by name");
        return result;
    }

    public Set<StateMachine> findStateMachinesByNameAndVersion(String stateMachineName, Long Version) {
        Set<StateMachine> result = Collections.synchronizedSet(new HashSet());
        this.scatterGatherQueryHelper(shardId -> this.stateMachinesDAO.findByNameAndVersion((ShardId)shardId, stateMachineName, Version), result, "stateMachines by name and version");
        return result;
    }

    private void scatterGatherQueryHelper(Function<ShardId, Collection> reader, Collection result, String fetchOperation) {
        Future[] tasksCompleted = new Future[this.fluxShardIdToShardPairModelMap.size()];
        AtomicInteger tasks = new AtomicInteger(0);
        this.fluxShardIdToShardPairModelMap.entrySet().forEach(entry -> {
            tasksCompleted[tasks.get()] = this.executorService.submit(() -> {
                try {
                    result.addAll((Collection)reader.apply((ShardId)entry.getKey()));
                }
                catch (Exception ex) {
                    logger.error("Error in fetching {} from Slave with key {} , id {} {}", (Object)fetchOperation, entry.getKey(), entry.getValue(), (Object)ex.getStackTrace());
                }
            });
            tasks.getAndIncrement();
        });
        try {
            boolean allDone = false;
            block2: while (!allDone) {
                allDone = true;
                for (int i = 0; i < this.fluxShardIdToShardPairModelMap.size(); ++i) {
                    if (tasksCompleted[i].isDone() || tasksCompleted[i].isCancelled()) continue;
                    allDone = false;
                    continue block2;
                }
            }
        }
        catch (Exception e) {
            logger.error("Exception occured while getting {} from Slaves : {}", (Object)fetchOperation, (Object)e.getStackTrace());
        }
    }
}

