/*
 * Decompiled with CFR 0.152.
 */
package org.fusesource.stomp.jms;

import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
import org.fusesource.hawtbuf.AsciiBuffer;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.hawtdispatch.Task;
import org.fusesource.stomp.client.Callback;
import org.fusesource.stomp.client.CallbackConnection;
import org.fusesource.stomp.client.Constants;
import org.fusesource.stomp.client.Promise;
import org.fusesource.stomp.client.ProtocolException;
import org.fusesource.stomp.client.Stomp;
import org.fusesource.stomp.codec.StompFrame;
import org.fusesource.stomp.jms.ActiveMQServerAdaptor;
import org.fusesource.stomp.jms.ApolloServerAdaptor;
import org.fusesource.stomp.jms.RabbitMQServerAdaptor;
import org.fusesource.stomp.jms.StompJmsDestination;
import org.fusesource.stomp.jms.StompJmsExceptionSupport;
import org.fusesource.stomp.jms.StompJmsMessageListener;
import org.fusesource.stomp.jms.StompJmsPrefetch;
import org.fusesource.stomp.jms.StompServerAdaptor;
import org.fusesource.stomp.jms.message.StompJmsMessage;
import org.fusesource.stomp.jms.message.StompJmsTextMessage;
import org.fusesource.stomp.jms.util.StompTranslator;

public class StompChannel {
    private static final StompServerAdaptor[] STOMP_SERVER_ADAPTORS = new StompServerAdaptor[]{new ApolloServerAdaptor(), new ActiveMQServerAdaptor(), new RabbitMQServerAdaptor(), new StompServerAdaptor()};
    static final long TIMEOUT = -1L;
    String channelId;
    String userName;
    String password;
    SSLContext sslContext;
    String ackMode;
    boolean omitHost;
    URI brokerURI;
    URI localURI;
    CallbackConnection connection;
    StompJmsMessageListener listener;
    ExceptionListener exceptionListener;
    AtomicBoolean started = new AtomicBoolean();
    AtomicBoolean connected = new AtomicBoolean();
    AsciiBuffer sessionId;
    AtomicInteger writeBufferRemaining = new AtomicInteger();
    AtomicInteger serverAckSubs = new AtomicInteger();
    StompServerAdaptor serverAdaptor;
    String clientId;
    private long disconnectTimeout = 10000L;
    CountDownLatch connectedLatch = new CountDownLatch(1);

    public AsciiBuffer sessionId() {
        return this.sessionId;
    }

    public CallbackConnection connection() {
        return this.connection;
    }

    public StompChannel copy() {
        StompChannel copy = new StompChannel();
        copy.brokerURI = this.brokerURI;
        copy.localURI = this.localURI;
        copy.userName = this.userName;
        copy.password = this.password;
        copy.ackMode = this.ackMode;
        copy.omitHost = this.omitHost;
        copy.sslContext = this.sslContext;
        return copy;
    }

    public void connect() throws JMSException {
        if (this.connected.compareAndSet(false, true)) {
            try {
                Promise<CallbackConnection> future = new Promise<CallbackConnection>();
                Stomp stomp = new Stomp(this.brokerURI);
                stomp.setLogin(this.userName);
                stomp.setPasscode(this.password);
                stomp.setLocalURI(this.localURI);
                stomp.setClientId(this.clientId);
                stomp.setSslContext(this.sslContext);
                stomp.connectCallback(future);
                if (this.omitHost) {
                    stomp.setHost(null);
                }
                this.connection = future.await();
                this.writeBufferRemaining.set(this.connection.transport().getProtocolCodec().getWriteBufferSize());
                this.connection.getDispatchQueue().execute(new Task(){

                    public void run() {
                        StompChannel.this.connection.receive(new Callback<StompFrame>(){

                            @Override
                            public void onFailure(Throwable value) {
                                StompChannel.this.handleException(value);
                            }

                            @Override
                            public void onSuccess(StompFrame value) {
                                StompChannel.this.onFrame(value);
                            }
                        });
                        StompChannel.this.connection.resume();
                    }
                });
                this.sessionId = this.connection.connectedFrame().headerMap().get(Constants.SESSION);
                if (this.sessionId == null) {
                    this.sessionId = new AsciiBuffer("id-" + UUID.randomUUID().toString());
                }
                this.started.set(true);
                String sv = this.getServerAndVersion();
                for (StompServerAdaptor adaptor : STOMP_SERVER_ADAPTORS) {
                    if (!adaptor.matchesServerAndVersion(sv)) continue;
                    this.serverAdaptor = adaptor;
                    break;
                }
                assert (this.serverAdaptor != null);
            }
            catch (Exception e) {
                this.connected.set(false);
                throw StompJmsExceptionSupport.create(e);
            }
            finally {
                this.connectedLatch.countDown();
            }
        }
        try {
            this.connectedLatch.await();
        }
        catch (InterruptedException e) {
            throw StompJmsExceptionSupport.create(e);
        }
    }

    public boolean isStarted() {
        return this.started.get();
    }

    public void close() throws JMSException {
        if (this.connected.compareAndSet(true, false)) {
            final CountDownLatch cd = new CountDownLatch(1);
            this.started.set(false);
            this.connection.getDispatchQueue().execute(new Task(){

                public void run() {
                    StompFrame frame = new StompFrame(Constants.DISCONNECT);
                    StompChannel.this.connection.request(frame, new Callback<StompFrame>(){

                        @Override
                        public void onFailure(Throwable value) {
                            this.onSuccess(null);
                        }

                        @Override
                        public void onSuccess(StompFrame value) {
                            cd.countDown();
                        }
                    });
                }
            });
            try {
                cd.await(this.getDisconnectTimeout(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                throw new JMSException("Interrupted");
            }
            finally {
                this.connection.close((Runnable)Dispatch.NOOP);
            }
        }
    }

    public void sendMessage(StompJmsMessage copy, AsciiBuffer txid, boolean sync) throws JMSException {
        if (sync && this.serverAckSubs.get() > 0) {
            throw new JMSException("Sync message sends not allowed when a subscription is using 'ack:auto'.  Causes deadlocks.");
        }
        copy.onSend();
        StompFrame frame = copy.getFrame();
        frame.action(Constants.SEND);
        if (!(copy instanceof StompJmsTextMessage)) {
            frame.headerMap().put(Constants.CONTENT_LENGTH, new AsciiBuffer(Integer.toString(frame.content().length)));
        }
        if (txid != null) {
            frame.headerMap().put(Constants.TRANSACTION, txid);
        }
        try {
            if (sync) {
                this.sendRequest(frame);
            } else {
                this.sendFrame(frame);
            }
        }
        catch (IOException e) {
            throw StompJmsExceptionSupport.create(e);
        }
    }

    public void ackMessage(AsciiBuffer consumerId, AsciiBuffer messageId, AsciiBuffer txid, Promise<StompFrame> callback) throws JMSException {
        if (callback != null && this.serverAckSubs.get() > 0) {
            throw new JMSException("Sync acks not allowed when a subscription is using 'ack:auto'.  Causes deadlocks.");
        }
        StompFrame frame = new StompFrame();
        frame.action(Constants.ACK);
        frame.headerMap().put(Constants.SUBSCRIPTION, consumerId);
        frame.headerMap().put(Constants.MESSAGE_ID, messageId);
        if (txid != null) {
            frame.headerMap().put(Constants.TRANSACTION, txid);
        }
        try {
            if (callback != null) {
                this.sendRequest(frame, callback);
            } else {
                this.sendFrame(frame);
            }
        }
        catch (IOException e) {
            throw StompJmsExceptionSupport.create(e);
        }
    }

    public void subscribe(StompJmsDestination destination, AsciiBuffer consumerId, AsciiBuffer selector, AsciiBuffer ackMode, boolean noLocal, boolean persistent, boolean browser, StompJmsPrefetch prefetch, Map<AsciiBuffer, AsciiBuffer> headers) throws JMSException {
        StompFrame frame = new StompFrame();
        frame.action(Constants.SUBSCRIBE);
        Map<AsciiBuffer, AsciiBuffer> headerMap = frame.headerMap();
        headerMap.put(Constants.DESTINATION, destination.toBuffer());
        headerMap.put(Constants.ID, consumerId);
        if (selector != null && !selector.trim().isEmpty()) {
            headerMap.put(Constants.SELECTOR, selector);
        }
        headerMap.put(Constants.ACK_MODE, ackMode);
        if (prefetch != null) {
            this.serverAdaptor.addSubscribeHeaders(headerMap, persistent, browser, noLocal, prefetch);
        }
        if (headers != null) {
            headerMap.putAll(headers);
        }
        try {
            if (!destination.isTopic() && this.serverAckSubs.get() > 0) {
                this.sendFrame(frame);
            } else {
                this.sendRequest(frame);
            }
        }
        catch (IOException e) {
            throw StompJmsExceptionSupport.create(e);
        }
    }

    public void unsubscribe(AsciiBuffer consumerId, boolean persistent) throws JMSException {
        StompFrame frame = this.serverAdaptor.createUnsubscribeFrame(consumerId, persistent);
        try {
            this.sendFrame(frame);
        }
        catch (IOException e) {
            throw StompJmsExceptionSupport.create(e);
        }
    }

    public AsciiBuffer startTransaction() throws JMSException {
        AsciiBuffer txid = this.connection.nextId("TX-");
        StompFrame frame = new StompFrame();
        frame.action(Constants.BEGIN);
        if (txid != null) {
            frame.headerMap().put(Constants.TRANSACTION, txid);
        }
        try {
            this.sendFrame(frame);
        }
        catch (IOException e) {
            throw StompJmsExceptionSupport.create(e);
        }
        return txid;
    }

    public void commitTransaction(AsciiBuffer txid) throws JMSException {
        if (this.serverAckSubs.get() > 0) {
            throw new JMSException("transactions not allowed when a subscription is using 'ack:auto'.  Causes deadlocks.");
        }
        StompFrame frame = new StompFrame();
        frame.action(Constants.COMMIT);
        if (txid != null) {
            frame.headerMap().put(Constants.TRANSACTION, txid);
        }
        try {
            this.sendRequest(frame);
        }
        catch (IOException e) {
            throw StompJmsExceptionSupport.create(e);
        }
    }

    public void rollbackTransaction(AsciiBuffer txid) throws JMSException {
        if (this.serverAckSubs.get() > 0) {
            throw new JMSException("transactions not allowed when a subscription is using 'ack:auto'.  Causes deadlocks.");
        }
        StompFrame frame = new StompFrame();
        frame.action(Constants.ABORT);
        if (txid != null) {
            frame.headerMap().put(Constants.TRANSACTION, txid);
        }
        try {
            this.sendRequest(frame);
        }
        catch (IOException e) {
            throw StompJmsExceptionSupport.create(e);
        }
    }

    public void sendFrame(final StompFrame frame) throws IOException {
        try {
            final int size = frame.size();
            if (this.writeBufferRemaining.getAndAdd(-size) > 0) {
                this.connection.getDispatchQueue().execute(new Task(){

                    public void run() {
                        StompChannel.this.connection.send(frame, new Callback<Void>(){

                            @Override
                            public void onFailure(Throwable value) {
                                StompChannel.this.handleException(value);
                            }

                            @Override
                            public void onSuccess(Void value) {
                                StompChannel.this.writeBufferRemaining.getAndAdd(size);
                            }
                        });
                    }
                });
            } else {
                final Promise<Void> future = new Promise<Void>(){

                    @Override
                    public void onSuccess(Void value) {
                        StompChannel.this.writeBufferRemaining.getAndAdd(size);
                        super.onSuccess(value);
                    }
                };
                this.connection.getDispatchQueue().execute(new Task(){

                    public void run() {
                        StompChannel.this.connection.send(frame, future);
                    }
                });
                future.await();
            }
        }
        catch (IOException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IOException(e.getMessage(), e);
        }
    }

    public void sendRequest(final StompFrame frame, final Promise<StompFrame> future) {
        this.connection.getDispatchQueue().execute(new Task(){

            public void run() {
                StompChannel.this.connection.request(frame, future);
            }
        });
    }

    public void sendRequest(StompFrame frame) throws IOException {
        try {
            Promise<StompFrame> future = new Promise<StompFrame>();
            this.sendRequest(frame, future);
            future.await();
        }
        catch (IOException e) {
            throw e;
        }
        catch (Exception e) {
            throw new IOException(e.getMessage(), e);
        }
    }

    public void onFrame(StompFrame frame) {
        AsciiBuffer action = frame.action();
        if (action.startsWith((Buffer)Constants.MESSAGE)) {
            try {
                StompJmsMessage msg = StompTranslator.convert(frame);
                msg.setFrame(frame);
                msg.setReadOnlyBody(true);
                msg.setReadOnlyProperties(true);
                StompJmsMessageListener l = this.listener;
                if (l != null) {
                    l.onMessage(msg);
                }
            }
            catch (JMSException e) {
                this.handleException(e);
            }
        } else {
            this.handleException(new ProtocolException("Unknown STOMP action: " + action));
        }
    }

    public String getChannelId() {
        return this.channelId;
    }

    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }

    public String getUserName() {
        return this.userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return this.password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public String getAckMode() {
        return this.ackMode;
    }

    public void setAckMode(String ackMode) {
        this.ackMode = ackMode;
    }

    public URI getBrokerURI() {
        return this.brokerURI;
    }

    public void setBrokerURI(URI brokerURI) {
        this.brokerURI = brokerURI;
    }

    public URI getLocalURI() {
        return this.localURI;
    }

    public void setLocalURI(URI localURI) {
        this.localURI = localURI;
    }

    public boolean isOmitHost() {
        return this.omitHost;
    }

    public void setOmitHost(boolean omitHost) {
        this.omitHost = omitHost;
    }

    public StompJmsMessageListener getListener() {
        return this.listener;
    }

    public void setListener(StompJmsMessageListener listener) {
        this.listener = listener;
    }

    public void setExceptionListener(ExceptionListener listener) {
        this.exceptionListener = listener;
    }

    private void handleException(Throwable e) {
        ExceptionListener l = this.exceptionListener;
        if (l != null) {
            l.onException(StompJmsExceptionSupport.create(e));
        } else if (this.started.get()) {
            e.printStackTrace();
        }
    }

    public AsciiBuffer nextId() {
        return this.connection.nextId();
    }

    public String getConnectedHostId() {
        return this.getConnectedHeader(Constants.HOST_ID);
    }

    public String getServerAndVersion() {
        return this.getConnectedHeader(Constants.SERVER);
    }

    public String getConnectedSessionId() {
        return this.getConnectedHeader(Constants.SESSION);
    }

    private String getConnectedHeader(AsciiBuffer header) {
        AsciiBuffer host;
        StompFrame frame = this.connection.connectedFrame();
        String rc = null;
        if (frame != null && (host = frame.getHeader(header)) != null) {
            rc = host.toString();
        }
        return rc;
    }

    public StompServerAdaptor getServerAdaptor() {
        return this.serverAdaptor;
    }

    public String getClientId() {
        return this.clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public long getDisconnectTimeout() {
        return this.disconnectTimeout;
    }

    public void setDisconnectTimeout(long disconnectTimeout) {
        this.disconnectTimeout = disconnectTimeout;
    }

    public SSLContext getSslContext() {
        return this.sslContext;
    }

    public void setSslContext(SSLContext sslContext) {
        this.sslContext = sslContext;
    }
}

