/*
 * Decompiled with CFR 0.152.
 */
package org.commoncrawl.rpc;

import java.io.IOException;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.commoncrawl.async.EventLoop;
import org.commoncrawl.rpc.Channel;
import org.commoncrawl.rpc.IncomingMessageContext;
import org.commoncrawl.rpc.MessageData;
import org.commoncrawl.rpc.OutgoingMessageContext;
import org.commoncrawl.rpc.RPCException;
import org.commoncrawl.rpc.RPCStruct;
import org.commoncrawl.util.shared.CCStringUtils;

public abstract class InProcessActor {
    public static final Log LOG = LogFactory.getLog(InProcessActor.class);
    EventLoop _eventLoop;
    boolean _ownsEventLoop;
    ThreadPoolExecutor _executor;
    Events _eventListener;

    public InProcessActor(ThreadPoolExecutor optionalExecutor, Events optionalEventListener) throws IOException {
        if (optionalExecutor == null) {
            this._eventLoop = new EventLoop();
            this._ownsEventLoop = true;
        } else {
            this._executor = optionalExecutor;
        }
        this._eventListener = optionalEventListener;
    }

    public void start() {
        if (this._ownsEventLoop) {
            this._eventLoop.start();
        }
        if (this._eventListener != null) {
            Runnable callback = new Runnable(){

                @Override
                public void run() {
                    InProcessActor.this._eventListener.onStartup(InProcessActor.this);
                }
            };
            if (this._eventLoop != null && this._eventLoop.getEventThread() == Thread.currentThread()) {
                callback.run();
            } else if (this._eventLoop != null) {
                this._eventLoop.queueAsyncRunnable(callback);
            } else {
                this._executor.submit(callback);
            }
        }
    }

    public void stop() {
        Runnable callback = new Runnable(){

            @Override
            public void run() {
                if (InProcessActor.this._eventListener != null) {
                    InProcessActor.this._eventListener.onShutdown(InProcessActor.this);
                }
                if (InProcessActor.this._ownsEventLoop) {
                    InProcessActor.this._eventLoop.stop();
                }
            }
        };
        if (this._eventLoop != null && this._eventLoop.getEventThread() == Thread.currentThread()) {
            callback.run();
        } else if (this._eventLoop != null) {
            this._eventLoop.queueAsyncRunnable(callback);
        } else {
            this._executor.submit(callback);
        }
    }

    public final Channel createChannel(InProcessActor sourceActor) {
        return this.createChannel(sourceActor, null, null);
    }

    public final Channel createChannel(EventLoop sourceEventLoop) {
        return this.createChannel(null, sourceEventLoop, null);
    }

    public final Channel createChannel(ThreadPoolExecutor sourceExecutor) {
        return this.createChannel(null, null, sourceExecutor);
    }

    public final Channel createChannel() {
        return this.createChannel(null, null, null);
    }

    private final Channel createChannel(final InProcessActor optionalSourceActor, final EventLoop optionalSourceEventLoop, final ThreadPoolExecutor optionalSourceExecutor) {
        return new InProcessChannel(){
            InProcessActor _optionalSourceActor;
            EventLoop _optionalSourceEventLoop;
            ThreadPoolExecutor _optionalSourceExecutor;
            {
                this._optionalSourceActor = optionalSourceActor;
                this._optionalSourceEventLoop = optionalSourceEventLoop;
                this._optionalSourceExecutor = optionalSourceExecutor;
            }

            @Override
            public void sendRequest(OutgoingMessageContext<? extends RPCStruct, ? extends RPCStruct> originalMessage) throws RPCException {
                final IncomingMessage incomingMessage = new IncomingMessage(originalMessage, this, originalMessage.getRequestId(), (RPCStruct)originalMessage.getInput(), (RPCStruct)originalMessage.getOutput());
                final 3 channel = this;
                Runnable callback = new Runnable(){

                    @Override
                    public void run() {
                        try {
                            InProcessActor.this.dispatch(channel, incomingMessage);
                        }
                        catch (RPCException e) {
                            LOG.error((Object)CCStringUtils.stringifyException((Throwable)e));
                            incomingMessage.setStatus(MessageData.Status.Error_RPCFailed);
                            incomingMessage.setErrorDesc(CCStringUtils.stringifyException((Throwable)e));
                            try {
                                this.sendResponse(incomingMessage);
                            }
                            catch (RPCException e1) {
                                LOG.error((Object)CCStringUtils.stringifyException((Throwable)e1));
                            }
                        }
                    }
                };
                if (InProcessActor.this._executor != null) {
                    InProcessActor.this._executor.submit(callback);
                } else {
                    InProcessActor.this._eventLoop.queueAsyncRunnable(callback);
                }
            }

            @Override
            public void sendResponse(final IncomingMessageContext<? extends RPCStruct, ? extends RPCStruct> message) throws RPCException {
                Runnable closure = new Runnable(){

                    @Override
                    public void run() {
                        IncomingMessage localMessage = (IncomingMessage)message;
                        if (localMessage._source.getCallback() != null) {
                            localMessage._source.setOutput(message.getOutput());
                            localMessage._source.setStatus(message.getStatus());
                            localMessage._source.setErrorDesc(message.getErrorDesc());
                            localMessage._source.getCallback().requestComplete(localMessage._source);
                        }
                    }
                };
                if (this._optionalSourceActor != null) {
                    if (this._optionalSourceActor._executor != null) {
                        this._optionalSourceActor._executor.submit(closure);
                    } else {
                        this._optionalSourceActor._eventLoop.queueAsyncRunnable(closure);
                    }
                } else if (this._optionalSourceEventLoop != null) {
                    this._optionalSourceEventLoop.queueAsyncRunnable(closure);
                } else if (this._optionalSourceExecutor != null) {
                    this._optionalSourceExecutor.submit(closure);
                } else {
                    closure.run();
                }
            }

            @Override
            public InProcessActor getActor() {
                return InProcessActor.this;
            }
        };
    }

    public abstract void dispatch(Channel var1, IncomingMessage var2) throws RPCException;

    public static interface InProcessChannel
    extends Channel {
        public InProcessActor getActor();
    }

    public static class IncomingMessage
    extends IncomingMessageContext {
        OutgoingMessageContext _source;

        public IncomingMessage(OutgoingMessageContext source, Channel channel, int requestId, RPCStruct input, RPCStruct output) {
            super(channel, requestId, input, output);
            this._source = source;
        }

        public String getServiceName() {
            return this._source._serviceName;
        }

        public String getMethodName() {
            return this._source._methodName;
        }
    }

    public static interface Events {
        public void onStartup(InProcessActor var1);

        public void onShutdown(InProcessActor var1);
    }
}

