/*
 * Decompiled with CFR 0.152.
 */
package org.commoncrawl.rpc;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.commoncrawl.rpc.ActorInterface;
import org.commoncrawl.rpc.IncomingMessageContext;
import org.commoncrawl.rpc.MessageData;
import org.commoncrawl.rpc.RPCChannel;
import org.commoncrawl.rpc.RPCException;
import org.commoncrawl.rpc.RPCFrame;
import org.commoncrawl.rpc.RPCMessageDispatcher;
import org.commoncrawl.rpc.RPCServerChannel;

public class RPCActorService {
    public static final Log LOG = LogFactory.getLog(RPCActorService.class);
    private final Map<RPCServerChannel, ServerChannelMapItem> _channelMap = new HashMap<RPCServerChannel, ServerChannelMapItem>();
    ThreadPoolExecutor _threadPool;

    public RPCActorService(ThreadPoolExecutor optionalThreadPool) {
        this._threadPool = optionalThreadPool;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void bindActor(RPCServerChannel channel, ActorInterface.RPCSpecification specification, ActorInterface actorInstance, ThreadPoolExecutor optionalThreadPool) {
        Map<RPCServerChannel, ServerChannelMapItem> map = this._channelMap;
        synchronized (map) {
            ServerChannelMapItem item = this._channelMap.get(channel);
            if (item == null) {
                item = new ServerChannelMapItem();
                this._channelMap.put(channel, item);
            } else if (item._serviceMap.get(specification._name) != null) {
                throw new RuntimeException("Invalid call to register Service. The specified channel - service specification association already exists!");
            }
            item._serviceMap.put(specification._name, new RPCActorInstance(specification, actorInstance, optionalThreadPool));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws IOException {
        Map<RPCServerChannel, ServerChannelMapItem> map = this._channelMap;
        synchronized (map) {
            for (RPCServerChannel channel : this._channelMap.keySet()) {
                channel.open();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        Map<RPCServerChannel, ServerChannelMapItem> map = this._channelMap;
        synchronized (map) {
            for (RPCServerChannel channel : this._channelMap.keySet()) {
                channel.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void dispatchWireRequest(final RPCServerChannel serverChannel, final RPCChannel source, final RPCFrame.IncomingFrame frame) throws RPCException {
        RPCActorInstance targetInstance = null;
        Map<RPCServerChannel, ServerChannelMapItem> map = this._channelMap;
        synchronized (map) {
            ServerChannelMapItem mapItem = this._channelMap.get(serverChannel);
            if (mapItem != null) {
                targetInstance = mapItem._serviceMap.get(frame._service);
            }
        }
        if (targetInstance == null) {
            throw new RPCException("Uknown Service Name:" + frame._service);
        }
        final ActorInterface dispatchInstance = targetInstance._instance;
        final DataInputStream payload = new DataInputStream(frame._payload);
        final RPCMessageDispatcher dispatcher = targetInstance._specification._dispatcher;
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    dispatcher.dispatch(dispatchInstance, serverChannel, frame._service, frame._method, payload, frame._requestId, source);
                }
                catch (RPCException e) {
                    IncomingMessageContext<Object, Object> dummyContext = new IncomingMessageContext<Object, Object>(source, frame._requestId, null, null);
                    dummyContext.setStatus(MessageData.Status.Error_RPCFailed);
                    dummyContext.setErrorDesc(e.toString());
                    try {
                        source.sendResponse(dummyContext);
                    }
                    catch (RPCException e1) {
                        LOG.error((Object)("RPCException while sending <FAILED> response::" + e.toString()));
                    }
                }
            }
        };
        if (targetInstance._threadPool != null) {
            targetInstance._threadPool.submit(runnable);
        } else if (this._threadPool != null) {
            this._threadPool.submit(runnable);
        } else {
            runnable.run();
        }
    }

    private static class ServerChannelMapItem {
        public Map<String, RPCActorInstance> _serviceMap = new HashMap<String, RPCActorInstance>();

        private ServerChannelMapItem() {
        }
    }

    private static class RPCActorInstance {
        public ActorInterface.RPCSpecification _specification;
        public ActorInterface _instance;
        public ThreadPoolExecutor _threadPool;

        RPCActorInstance(ActorInterface.RPCSpecification spec, ActorInterface actorInterface, ThreadPoolExecutor executor) {
            this._specification = spec;
            this._instance = actorInterface;
            this._threadPool = executor;
        }
    }
}

