/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.region;

import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import java.io.IOException;
import java.util.Iterator;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.PrefetchSubscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.SubscriptionKey;

public class DurableTopicSubscription
extends PrefetchSubscription {
    private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
    private final ConcurrentHashMap destinations = new ConcurrentHashMap();
    private final SubscriptionKey subscriptionKey;
    private final boolean keepDurableSubsActive;
    private boolean active = false;

    public DurableTopicSubscription(Broker broker, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive, PendingMessageCursor cursor) throws InvalidSelectorException {
        super(broker, context, info, cursor);
        this.keepDurableSubsActive = keepDurableSubsActive;
        this.subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
    }

    public synchronized boolean isActive() {
        return this.active;
    }

    protected boolean isFull() {
        return !this.active || super.isFull();
    }

    public synchronized void gc() {
    }

    public synchronized void add(ConnectionContext context, Destination destination) throws Exception {
        super.add(context, destination);
        this.destinations.put(destination.getActiveMQDestination(), destination);
        if (this.active || this.keepDurableSubsActive) {
            Topic topic = (Topic)destination;
            topic.activate(context, this);
        }
        this.dispatchMatched();
    }

    public synchronized void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
        if (!this.active) {
            this.active = true;
            this.context = context;
            this.info = info;
            if (!this.keepDurableSubsActive) {
                Iterator iter = this.destinations.values().iterator();
                while (iter.hasNext()) {
                    Topic topic = (Topic)iter.next();
                    topic.activate(context, this);
                }
            }
            this.pending.start();
            this.dispatchMatched();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void deactivate(boolean keepDurableSubsActive) throws Exception {
        MessageReference node;
        Iterator iter;
        this.active = false;
        this.pending.stop();
        if (!keepDurableSubsActive) {
            iter = this.destinations.values().iterator();
            while (iter.hasNext()) {
                Topic topic = (Topic)iter.next();
                topic.deactivate(this.context, this);
            }
        }
        iter = this.dispatched.iterator();
        while (iter.hasNext()) {
            node = (MessageReference)iter.next();
            Integer count = (Integer)this.redeliveredMessages.get(node.getMessageId());
            if (count != null) {
                this.redeliveredMessages.put(node.getMessageId(), new Integer(count + 1));
            } else {
                this.redeliveredMessages.put(node.getMessageId(), new Integer(1));
            }
            if (keepDurableSubsActive) {
                PendingMessageCursor pendingMessageCursor = this.pending;
                synchronized (pendingMessageCursor) {
                    this.pending.addMessageFirst(node);
                }
            } else {
                node.decrementReferenceCount();
            }
            iter.remove();
        }
        if (!keepDurableSubsActive) {
            PendingMessageCursor pendingMessageCursor = this.pending;
            synchronized (pendingMessageCursor) {
                this.pending.reset();
                while (this.pending.hasNext()) {
                    node = this.pending.next();
                    node.decrementReferenceCount();
                    this.pending.remove();
                }
            }
        }
        this.prefetchExtension = 0;
    }

    protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
        MessageDispatch md = super.createMessageDispatch(node, message);
        Integer count = (Integer)this.redeliveredMessages.get(node.getMessageId());
        if (count != null) {
            md.setRedeliveryCounter(count);
        }
        return md;
    }

    public synchronized void add(MessageReference node) throws Exception {
        if (!this.active && !this.keepDurableSubsActive) {
            return;
        }
        node.incrementReferenceCount();
        super.add(node);
    }

    public int getPendingQueueSize() {
        if (this.active || this.keepDurableSubsActive) {
            return super.getPendingQueueSize();
        }
        return 0;
    }

    public void setSelector(String selector) throws InvalidSelectorException {
        throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
    }

    protected boolean canDispatch(MessageReference node) {
        return this.active;
    }

    protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
        node.getRegionDestination().acknowledge(context, this, ack, node);
        this.redeliveredMessages.remove(node.getMessageId());
        node.decrementReferenceCount();
    }

    public String getSubscriptionName() {
        return this.subscriptionKey.getSubscriptionName();
    }

    public String toString() {
        return "DurableTopicSubscription: consumer=" + this.info.getConsumerId() + ", destinations=" + this.destinations.size() + ", dispatched=" + this.dispatched.size() + ", delivered=" + this.prefetchExtension + ", pending=" + this.getPendingQueueSize();
    }

    public String getClientId() {
        return this.subscriptionKey.getClientId();
    }

    public SubscriptionKey getSubscriptionKey() {
        return this.subscriptionKey;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void destroy() {
        MessageReference node;
        PendingMessageCursor pendingMessageCursor = this.pending;
        synchronized (pendingMessageCursor) {
            this.pending.reset();
            while (this.pending.hasNext()) {
                node = this.pending.next();
                node.decrementReferenceCount();
            }
            this.pending.clear();
        }
        Iterator iter = this.dispatched.iterator();
        while (iter.hasNext()) {
            node = (MessageReference)iter.next();
            node.decrementReferenceCount();
        }
        this.dispatched.clear();
    }
}

