/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.milo.opcua.sdk.client.subscriptions;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
import org.eclipse.milo.opcua.sdk.client.api.UaSession;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaMonitoredItem;
import org.eclipse.milo.opcua.sdk.client.subscriptions.OpcUaSubscription;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.serialization.UaRequestMessage;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UByte;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
import org.eclipse.milo.opcua.stack.core.types.structured.CreateSubscriptionResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.DataChangeNotification;
import org.eclipse.milo.opcua.stack.core.types.structured.EventFieldList;
import org.eclipse.milo.opcua.stack.core.types.structured.EventNotificationList;
import org.eclipse.milo.opcua.stack.core.types.structured.ModifySubscriptionResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemNotification;
import org.eclipse.milo.opcua.stack.core.types.structured.NotificationMessage;
import org.eclipse.milo.opcua.stack.core.types.structured.PublishRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.PublishResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.RepublishResponse;
import org.eclipse.milo.opcua.stack.core.types.structured.RequestHeader;
import org.eclipse.milo.opcua.stack.core.types.structured.StatusChangeNotification;
import org.eclipse.milo.opcua.stack.core.types.structured.SubscriptionAcknowledgement;
import org.eclipse.milo.opcua.stack.core.util.ConversionUtil;
import org.eclipse.milo.opcua.stack.core.util.ExecutionQueue;
import org.eclipse.milo.opcua.stack.core.util.FutureUtils;
import org.eclipse.milo.opcua.stack.core.util.Unit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpcUaSubscriptionManager
implements UaSubscriptionManager {
    public static final UInteger DEFAULT_MAX_NOTIFICATIONS_PER_PUBLISH = Unsigned.uint((int)65535);
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final Map<UInteger, OpcUaSubscription> subscriptions = Maps.newConcurrentMap();
    private final Map<UInteger, WatchdogTimer> watchdogTimers = Maps.newConcurrentMap();
    private final List<UaSubscriptionManager.SubscriptionListener> subscriptionListeners = Lists.newCopyOnWriteArrayList();
    private final ConcurrentMap<NodeId, AtomicLong> pendingCountMap = Maps.newConcurrentMap();
    private final ExecutionQueue deliveryQueue;
    private final ExecutionQueue processingQueue;
    private final OpcUaClient client;

    public OpcUaSubscriptionManager(OpcUaClient client) {
        this.client = client;
        this.deliveryQueue = new ExecutionQueue((Executor)client.getConfig().getExecutor());
        this.processingQueue = new ExecutionQueue((Executor)client.getConfig().getExecutor());
        client.addSessionActivityListener(new SessionActivityListener(){

            @Override
            public void onSessionInactive(UaSession session) {
                OpcUaSubscriptionManager.this.pendingCountMap.replace(session.getSessionId(), new AtomicLong(0L));
            }

            @Override
            public void onSessionActive(UaSession session) {
                OpcUaSubscriptionManager.this.maybeSendPublishRequests();
            }
        });
    }

    @Override
    public CompletableFuture<UaSubscription> createSubscription(double requestedPublishingInterval) {
        return this.createSubscription(requestedPublishingInterval, this::getLifetimeCount, this::getMaxKeepAliveCount, DEFAULT_MAX_NOTIFICATIONS_PER_PUBLISH, true, UByte.MIN);
    }

    @Override
    public CompletableFuture<UaSubscription> createSubscription(double requestedPublishingInterval, UInteger requestedLifetimeCount, UInteger requestedMaxKeepAliveCount, UInteger maxNotificationsPerPublish, boolean publishingEnabled, UByte priority) {
        return this.createSubscription(requestedPublishingInterval, (Double p, UInteger c) -> requestedLifetimeCount, (Double p) -> requestedMaxKeepAliveCount, maxNotificationsPerPublish, publishingEnabled, priority);
    }

    @Override
    public CompletableFuture<UaSubscription> createSubscription(double requestedPublishingInterval, BiFunction<Double, UInteger, UInteger> getLifetimeCount, Function<Double, UInteger> getMaxKeepAliveCount, UInteger maxNotificationsPerPublish, boolean publishingEnabled, UByte priority) {
        UInteger requestedMaxKeepAliveCount = getMaxKeepAliveCount.apply(requestedPublishingInterval);
        UInteger requestedLifetimeCount = getLifetimeCount.apply(requestedPublishingInterval, requestedMaxKeepAliveCount);
        CompletableFuture<CreateSubscriptionResponse> future = this.client.createSubscription(requestedPublishingInterval, requestedLifetimeCount, requestedMaxKeepAliveCount, maxNotificationsPerPublish, publishingEnabled, priority);
        return future.thenCompose(response -> {
            OpcUaSubscription subscription = new OpcUaSubscription(this.client, response.getSubscriptionId(), response.getRevisedPublishingInterval(), response.getRevisedLifetimeCount(), response.getRevisedMaxKeepAliveCount(), maxNotificationsPerPublish, publishingEnabled, priority);
            subscription.setRequestedPublishingInterval(requestedPublishingInterval);
            subscription.setRequestedLifetimeCount(requestedLifetimeCount);
            subscription.setRequestedMaxKeepAliveCount(requestedMaxKeepAliveCount);
            double revisedPublishingInterval = response.getRevisedPublishingInterval();
            UInteger newMaxKeepAliveCount = (UInteger)getMaxKeepAliveCount.apply(revisedPublishingInterval);
            if (requestedPublishingInterval != revisedPublishingInterval && !requestedMaxKeepAliveCount.equals((Object)newMaxKeepAliveCount)) {
                UInteger newLifetimeCount = (UInteger)getLifetimeCount.apply(revisedPublishingInterval, newMaxKeepAliveCount);
                CompletableFuture<ModifySubscriptionResponse> modifyFuture = this.client.modifySubscription(response.getSubscriptionId(), revisedPublishingInterval, newLifetimeCount, newMaxKeepAliveCount, maxNotificationsPerPublish, priority);
                return modifyFuture.thenApply(modifyResponse -> {
                    subscription.setRequestedLifetimeCount(newLifetimeCount);
                    subscription.setRequestedMaxKeepAliveCount(newMaxKeepAliveCount);
                    subscription.setRevisedPublishingInterval(modifyResponse.getRevisedPublishingInterval());
                    subscription.setRevisedLifetimeCount(modifyResponse.getRevisedLifetimeCount());
                    subscription.setRevisedMaxKeepAliveCount(modifyResponse.getRevisedMaxKeepAliveCount());
                    this.subscriptions.put(subscription.getSubscriptionId(), subscription);
                    WatchdogTimer watchdogTimer = new WatchdogTimer(subscription, this.client.getConfig().getSubscriptionWatchdogMultiplier());
                    this.watchdogTimers.put(subscription.getSubscriptionId(), watchdogTimer);
                    watchdogTimer.kick();
                    this.maybeSendPublishRequests();
                    return subscription;
                });
            }
            this.subscriptions.put(subscription.getSubscriptionId(), subscription);
            WatchdogTimer watchdogTimer = new WatchdogTimer(subscription, this.client.getConfig().getSubscriptionWatchdogMultiplier());
            this.watchdogTimers.put(subscription.getSubscriptionId(), watchdogTimer);
            watchdogTimer.kick();
            this.maybeSendPublishRequests();
            return CompletableFuture.completedFuture(subscription);
        });
    }

    private UInteger getMaxKeepAliveCount(double publishingInterval) {
        int count = (int)Math.ceil(10000.0 / Math.max(1.0, publishingInterval));
        return Unsigned.uint((int)Math.max(1, count));
    }

    private UInteger getLifetimeCount(double publishingInterval, UInteger maxKeepAliveCount) {
        BigInteger lifetimeCount = maxKeepAliveCount.toBigInteger().multiply(BigInteger.valueOf(6L)).min(BigInteger.valueOf(0xFFFFFFFFL));
        return Unsigned.uint((long)lifetimeCount.longValue());
    }

    @Override
    public CompletableFuture<UaSubscription> modifySubscription(UInteger subscriptionId, double requestedPublishingInterval) {
        OpcUaSubscription subscription = this.subscriptions.get(subscriptionId);
        if (subscription == null) {
            return FutureUtils.failedUaFuture((long)0x80280000L);
        }
        return this.modifySubscription(subscriptionId, requestedPublishingInterval, this::getLifetimeCount, this::getMaxKeepAliveCount, subscription.getMaxNotificationsPerPublish(), subscription.getPriority());
    }

    @Override
    public CompletableFuture<UaSubscription> modifySubscription(UInteger subscriptionId, double requestedPublishingInterval, UInteger requestedLifetimeCount, UInteger requestedMaxKeepAliveCount, UInteger maxNotificationsPerPublish, UByte priority) {
        return this.modifySubscription(subscriptionId, requestedPublishingInterval, (Double p, UInteger c) -> requestedLifetimeCount, (Double p) -> requestedMaxKeepAliveCount, maxNotificationsPerPublish, priority);
    }

    @Override
    public CompletableFuture<UaSubscription> modifySubscription(UInteger subscriptionId, double requestedPublishingInterval, BiFunction<Double, UInteger, UInteger> getLifetimeCount, Function<Double, UInteger> getMaxKeepAliveCount, UInteger maxNotificationsPerPublish, UByte priority) {
        OpcUaSubscription subscription = this.subscriptions.get(subscriptionId);
        if (subscription == null) {
            return FutureUtils.failedUaFuture((long)0x80280000L);
        }
        UInteger requestedMaxKeepAliveCount = getMaxKeepAliveCount.apply(requestedPublishingInterval);
        UInteger requestedLifetimeCount = getLifetimeCount.apply(requestedPublishingInterval, requestedMaxKeepAliveCount);
        CompletableFuture<ModifySubscriptionResponse> future = this.client.modifySubscription(subscriptionId, requestedPublishingInterval, requestedLifetimeCount, requestedMaxKeepAliveCount, maxNotificationsPerPublish, priority);
        return future.thenCompose(response -> {
            subscription.setRequestedPublishingInterval(requestedPublishingInterval);
            subscription.setRequestedLifetimeCount(requestedLifetimeCount);
            subscription.setRequestedMaxKeepAliveCount(requestedMaxKeepAliveCount);
            subscription.setRevisedPublishingInterval(response.getRevisedPublishingInterval());
            subscription.setRevisedLifetimeCount(response.getRevisedLifetimeCount());
            subscription.setRevisedMaxKeepAliveCount(response.getRevisedMaxKeepAliveCount());
            subscription.setMaxNotificationsPerPublish(maxNotificationsPerPublish);
            subscription.setPriority(priority);
            double revisedPublishingInterval = response.getRevisedPublishingInterval();
            UInteger newMaxKeepAliveCount = (UInteger)getMaxKeepAliveCount.apply(revisedPublishingInterval);
            if (requestedPublishingInterval != revisedPublishingInterval && !requestedMaxKeepAliveCount.equals((Object)newMaxKeepAliveCount)) {
                UInteger newLifetimeCount = (UInteger)getLifetimeCount.apply(revisedPublishingInterval, newMaxKeepAliveCount);
                CompletableFuture<ModifySubscriptionResponse> modifyFuture = this.client.modifySubscription(subscriptionId, revisedPublishingInterval, newLifetimeCount, newMaxKeepAliveCount, maxNotificationsPerPublish, priority);
                return modifyFuture.thenApply(modifyResponse -> {
                    subscription.setRequestedLifetimeCount(newLifetimeCount);
                    subscription.setRequestedMaxKeepAliveCount(newMaxKeepAliveCount);
                    subscription.setRevisedPublishingInterval(modifyResponse.getRevisedPublishingInterval());
                    subscription.setRevisedLifetimeCount(modifyResponse.getRevisedLifetimeCount());
                    subscription.setRevisedMaxKeepAliveCount(modifyResponse.getRevisedMaxKeepAliveCount());
                    WatchdogTimer watchdogTimer = this.watchdogTimers.remove(subscriptionId);
                    if (watchdogTimer != null) {
                        watchdogTimer.kick();
                    }
                    this.maybeSendPublishRequests();
                    return subscription;
                });
            }
            WatchdogTimer watchdogTimer = this.watchdogTimers.remove(subscriptionId);
            if (watchdogTimer != null) {
                watchdogTimer.kick();
            }
            this.maybeSendPublishRequests();
            return CompletableFuture.completedFuture(subscription);
        });
    }

    @Override
    public CompletableFuture<UaSubscription> deleteSubscription(UInteger subscriptionId) {
        ArrayList subscriptionIds = Lists.newArrayList((Object[])new UInteger[]{subscriptionId});
        return this.client.deleteSubscriptions(subscriptionIds).thenApply(r -> {
            OpcUaSubscription subscription = this.subscriptions.remove(subscriptionId);
            WatchdogTimer watchdogTimer = this.watchdogTimers.remove(subscriptionId);
            if (watchdogTimer != null) {
                watchdogTimer.cancel();
            }
            this.maybeSendPublishRequests();
            return subscription;
        });
    }

    public void transferFailed(UInteger subscriptionId, StatusCode statusCode) {
        OpcUaSubscription subscription = this.subscriptions.remove(subscriptionId);
        WatchdogTimer watchdogTimer = this.watchdogTimers.remove(subscriptionId);
        if (watchdogTimer != null) {
            watchdogTimer.cancel();
        }
        if (subscription != null) {
            this.subscriptionListeners.forEach(l -> l.onSubscriptionTransferFailed(subscription, statusCode));
            subscription.getNotificationListeners().forEach(l -> l.onSubscriptionTransferFailed(subscription, statusCode));
        }
    }

    @Override
    public ImmutableList<UaSubscription> getSubscriptions() {
        return ImmutableList.copyOf(this.subscriptions.values());
    }

    @Override
    public void addSubscriptionListener(UaSubscriptionManager.SubscriptionListener listener) {
        this.subscriptionListeners.add(listener);
    }

    @Override
    public void removeSubscriptionListener(UaSubscriptionManager.SubscriptionListener listener) {
        this.subscriptionListeners.remove(listener);
    }

    private long getMaxPendingPublishes() {
        long maxPendingPublishRequests = this.client.getConfig().getMaxPendingPublishRequests().longValue();
        return this.subscriptions.isEmpty() ? 0L : Math.min((long)(this.subscriptions.size() + 1), maxPendingPublishRequests);
    }

    private UInteger getTimeoutHint() {
        double maxKeepAlive = this.subscriptions.values().stream().map(s -> s.getRevisedPublishingInterval() * s.getRevisedMaxKeepAliveCount().doubleValue()).max(Comparator.naturalOrder()).orElse(this.client.getConfig().getRequestTimeout().doubleValue());
        long maxPendingPublishes = this.getMaxPendingPublishes();
        double timeoutHint = (double)maxPendingPublishes * maxKeepAlive * 1.5;
        if (Double.isInfinite(timeoutHint) || timeoutHint > 4.294967295E9) {
            timeoutHint = 0.0;
        }
        this.logger.debug("getTimeoutHint() maxKeepAlive={} maxPendingPublishes={} timeoutHint={}", new Object[]{maxKeepAlive, maxPendingPublishes, timeoutHint});
        return Unsigned.uint((long)((long)timeoutHint));
    }

    private void maybeSendPublishRequests() {
        long maxPendingPublishes = this.getMaxPendingPublishes();
        if (maxPendingPublishes == 0L) {
            return;
        }
        this.client.getSession().thenAccept(session -> {
            AtomicLong pendingCount = this.pendingCountMap.computeIfAbsent(session.getSessionId(), id -> new AtomicLong(0L));
            for (long i = pendingCount.get(); i < maxPendingPublishes; ++i) {
                if (pendingCount.incrementAndGet() <= maxPendingPublishes) {
                    this.sendPublishRequest((UaSession)session, pendingCount);
                    continue;
                }
                pendingCount.getAndUpdate(p -> p > 0L ? p - 1L : 0L);
            }
            if (this.pendingCountMap.size() > 1) {
                this.pendingCountMap.entrySet().removeIf(e -> !((NodeId)e.getKey()).equals((Object)session.getSessionId()));
            }
        });
    }

    private void sendPublishRequest(UaSession session, AtomicLong pendingCount) {
        ArrayList subscriptionAcknowledgements = new ArrayList();
        this.subscriptions.values().forEach(subscription -> {
            List<UInteger> list = subscription.availableAcknowledgements;
            synchronized (list) {
                subscription.availableAcknowledgements.forEach(sequenceNumber -> subscriptionAcknowledgements.add(new SubscriptionAcknowledgement(subscription.getSubscriptionId(), sequenceNumber)));
                subscription.availableAcknowledgements.clear();
            }
        });
        RequestHeader requestHeader = this.client.getStackClient().newRequestHeader(session.getAuthenticationToken(), this.getTimeoutHint());
        UInteger requestHandle = requestHeader.getRequestHandle();
        PublishRequest request = new PublishRequest(requestHeader, subscriptionAcknowledgements.toArray(new SubscriptionAcknowledgement[0]));
        if (this.logger.isDebugEnabled()) {
            Object[] ackStrings = (String[])subscriptionAcknowledgements.stream().map(ack -> String.format("id=%s/seq=%s", ack.getSubscriptionId(), ack.getSequenceNumber())).toArray(String[]::new);
            this.logger.debug("Sending PublishRequest, requestHandle={}, acknowledgements={}", (Object)requestHandle, (Object)Arrays.toString(ackStrings));
        }
        this.client.sendRequest((UaRequestMessage)request).whenComplete((response, ex) -> {
            if (response != null) {
                this.logger.debug("Received PublishResponse, sequenceNumber={}", (Object)response.getNotificationMessage().getSequenceNumber());
                this.processingQueue.submit(() -> this.onPublishComplete((PublishResponse)response, pendingCount));
            } else {
                StatusCode statusCode = UaException.extract((Throwable)ex).map(UaException::getStatusCode).orElse(StatusCode.BAD);
                this.logger.debug("Publish service failure (requestHandle={}): {}", new Object[]{requestHandle, statusCode, ex});
                pendingCount.getAndUpdate(p -> p > 0L ? p - 1L : 0L);
                if (statusCode.getValue() != 2155413504L && statusCode.getValue() != 0x80780000L) {
                    this.maybeSendPublishRequests();
                }
                UaException uax = UaException.extract((Throwable)ex).orElse(new UaException(ex));
                this.subscriptionListeners.forEach(l -> l.onPublishFailure(uax));
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onPublishComplete(PublishResponse response, AtomicLong pendingCount) {
        long expectedSequenceNumber;
        NotificationMessage notificationMessage;
        long sequenceNumber;
        this.logger.debug("onPublishComplete() response for subscriptionId={}", (Object)response.getSubscriptionId());
        UInteger subscriptionId = response.getSubscriptionId();
        OpcUaSubscription subscription = this.subscriptions.get(subscriptionId);
        if (subscription == null) {
            WatchdogTimer watchdogTimer = this.watchdogTimers.remove(subscriptionId);
            if (watchdogTimer != null) {
                watchdogTimer.cancel();
            }
            pendingCount.getAndUpdate(p -> p > 0L ? p - 1L : 0L);
            this.maybeSendPublishRequests();
            return;
        }
        WatchdogTimer watchdogTimer = this.watchdogTimers.get(subscriptionId);
        if (watchdogTimer != null) {
            watchdogTimer.kick();
        }
        if ((sequenceNumber = (notificationMessage = response.getNotificationMessage()).getSequenceNumber().longValue()) > (expectedSequenceNumber = subscription.getLastSequenceNumber() + 1L)) {
            this.logger.warn("[id={}] expected sequence={}, received sequence={}. Calling Republish service...", new Object[]{subscriptionId, expectedSequenceNumber, sequenceNumber});
            this.processingQueue.pause();
            this.processingQueue.submitToHead(() -> this.onPublishComplete(response, pendingCount));
            this.republish(subscriptionId, expectedSequenceNumber, sequenceNumber).whenComplete((dataLost, ex) -> {
                if (ex != null) {
                    this.logger.debug("Republish failed: {}", (Object)ex.getMessage(), ex);
                    this.subscriptionListeners.forEach(l -> l.onNotificationDataLost(subscription));
                    subscription.getNotificationListeners().forEach(l -> l.onNotificationDataLost(subscription));
                } else if (dataLost.booleanValue()) {
                    this.subscriptionListeners.forEach(l -> l.onNotificationDataLost(subscription));
                    subscription.getNotificationListeners().forEach(l -> l.onNotificationDataLost(subscription));
                }
                subscription.setLastSequenceNumber(sequenceNumber - 1L);
                this.processingQueue.resume();
            });
            return;
        }
        if (notificationMessage.getNotificationData() != null && notificationMessage.getNotificationData().length > 0) {
            subscription.setLastSequenceNumber(sequenceNumber);
        }
        UInteger[] availableSequenceNumbers = response.getAvailableSequenceNumbers();
        List<UInteger> list = subscription.availableAcknowledgements;
        synchronized (list) {
            subscription.availableAcknowledgements.clear();
            if (availableSequenceNumbers != null && availableSequenceNumbers.length > 0) {
                Collections.addAll(subscription.availableAcknowledgements, availableSequenceNumbers);
            }
        }
        if (this.logger.isDebugEnabled() && availableSequenceNumbers != null) {
            Object[] seqStrings = (String[])Arrays.stream(availableSequenceNumbers).map(sequence -> String.format("id=%s/seq=%s", subscriptionId, sequence)).toArray(String[]::new);
            this.logger.debug("[id={}] PublishResponse sequence={}, available sequences={}", new Object[]{subscriptionId, sequenceNumber, Arrays.toString(seqStrings)});
        }
        DateTime publishTime = notificationMessage.getPublishTime();
        this.logger.debug("onPublishComplete(), subscriptionId={}, sequenceNumber={}, publishTime={}", new Object[]{subscriptionId, notificationMessage.getSequenceNumber(), publishTime});
        this.deliverNotificationMessage(subscription, notificationMessage).thenRunAsync(() -> {
            pendingCount.getAndUpdate(p -> p > 0L ? p - 1L : 0L);
            this.maybeSendPublishRequests();
        }, this.client.getConfig().getExecutor());
    }

    private CompletableFuture<Boolean> republish(UInteger subscriptionId, long fromSequence, long toSequence) {
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.republish(subscriptionId, fromSequence, toSequence, false, future);
        return future;
    }

    private void republish(UInteger subscriptionId, long fromSequence, long toSequence, boolean dataLost, CompletableFuture<Boolean> future) {
        if (fromSequence == toSequence) {
            future.complete(dataLost);
        } else {
            this.client.republish(subscriptionId, Unsigned.uint((long)fromSequence)).whenComplete((response, ex) -> {
                if (response != null) {
                    try {
                        this.onRepublishComplete(subscriptionId, (RepublishResponse)response, Unsigned.uint((long)fromSequence));
                        this.republish(subscriptionId, fromSequence + 1L, toSequence, dataLost, future);
                    }
                    catch (UaException e) {
                        this.republish(subscriptionId, fromSequence + 1L, toSequence, true, future);
                    }
                } else {
                    StatusCode statusCode = UaException.extract((Throwable)ex).map(UaException::getStatusCode).orElse(StatusCode.BAD);
                    if (statusCode.getValue() == 2155544576L) {
                        this.republish(subscriptionId, fromSequence + 1L, toSequence, true, future);
                    } else {
                        future.completeExceptionally((Throwable)ex);
                    }
                }
            });
        }
    }

    private void onRepublishComplete(UInteger subscriptionId, RepublishResponse response, UInteger expectedSequenceNumber) throws UaException {
        NotificationMessage notificationMessage = response.getNotificationMessage();
        UInteger sequenceNumber = notificationMessage.getSequenceNumber();
        if (!sequenceNumber.equals((Object)expectedSequenceNumber)) {
            throw new UaException(0x80880000L, "expected sequence=" + expectedSequenceNumber + ", received sequence=" + sequenceNumber);
        }
        DateTime publishTime = notificationMessage.getPublishTime();
        this.logger.debug("onRepublishComplete(), subscriptionId={}, sequenceNumber={}, publishTime={}", new Object[]{subscriptionId, notificationMessage.getSequenceNumber(), publishTime});
        OpcUaSubscription subscription = this.subscriptions.get(subscriptionId);
        if (subscription != null) {
            this.deliverNotificationMessage(subscription, notificationMessage);
        }
    }

    private CompletableFuture<Unit> deliverNotificationMessage(OpcUaSubscription subscription, NotificationMessage notificationMessage) {
        CompletableFuture<Unit> delivered = new CompletableFuture<Unit>();
        subscription.getNotificationSemaphore().acquire().thenAccept(permit -> this.deliveryQueue.submit(() -> {
            try {
                Map<UInteger, OpcUaMonitoredItem> items = subscription.getItemsByClientHandle();
                List notificationData = ConversionUtil.l((Object[])notificationMessage.getNotificationData());
                if (notificationData.isEmpty()) {
                    this.subscriptionListeners.forEach(listener -> listener.onKeepAlive(subscription, notificationMessage.getPublishTime()));
                    subscription.getNotificationListeners().forEach(listener -> listener.onKeepAliveNotification(subscription, notificationMessage.getPublishTime()));
                }
                for (ExtensionObject xo : notificationData) {
                    Object o = xo.decode(this.client.getStaticSerializationContext());
                    if (o instanceof DataChangeNotification) {
                        DataChangeNotification dcn = (DataChangeNotification)o;
                        List monitoredItemNotifications = ConversionUtil.l((Object[])dcn.getMonitoredItems());
                        int notificationCount = monitoredItemNotifications.size();
                        this.logger.debug("Received {} MonitoredItemNotifications", (Object)notificationCount);
                        for (MonitoredItemNotification min : monitoredItemNotifications) {
                            this.logger.trace("MonitoredItemNotification: clientHandle={}, value={}", (Object)min.getClientHandle(), (Object)min.getValue());
                            OpcUaMonitoredItem item = items.get(min.getClientHandle());
                            if (item != null) {
                                item.onValueArrived(min.getValue());
                                continue;
                            }
                            this.logger.warn("no item for clientHandle=" + min.getClientHandle());
                        }
                        if (notificationCount == 0) {
                            this.subscriptionListeners.forEach(listener -> listener.onKeepAlive(subscription, notificationMessage.getPublishTime()));
                            subscription.getNotificationListeners().forEach(listener -> listener.onKeepAliveNotification(subscription, notificationMessage.getPublishTime()));
                            continue;
                        }
                        if (subscription.getNotificationListeners().isEmpty()) continue;
                        ArrayList<UaMonitoredItem> monitoredItems = new ArrayList<UaMonitoredItem>();
                        ArrayList<DataValue> dataValues = new ArrayList<DataValue>();
                        for (MonitoredItemNotification n : monitoredItemNotifications) {
                            UaMonitoredItem item = subscription.getItemsByClientHandle().get(n.getClientHandle());
                            if (item == null) continue;
                            monitoredItems.add(item);
                            dataValues.add(n.getValue());
                        }
                        subscription.getNotificationListeners().forEach(listener -> listener.onDataChangeNotification(subscription, monitoredItems, dataValues, notificationMessage.getPublishTime()));
                        continue;
                    }
                    if (o instanceof EventNotificationList) {
                        EventNotificationList enl = (EventNotificationList)o;
                        List eventFieldLists = ConversionUtil.l((Object[])enl.getEvents());
                        for (EventFieldList efl : eventFieldLists) {
                            this.logger.trace("EventFieldList: clientHandle={}, values={}", (Object)efl.getClientHandle(), (Object)Arrays.toString(efl.getEventFields()));
                            OpcUaMonitoredItem item = items.get(efl.getClientHandle());
                            if (item == null) continue;
                            item.onEventArrived(efl.getEventFields());
                        }
                        if (subscription.getNotificationListeners().isEmpty()) continue;
                        ArrayList<UaMonitoredItem> monitoredItems = new ArrayList<UaMonitoredItem>();
                        ArrayList<Variant[]> eventFields = new ArrayList<Variant[]>();
                        for (EventFieldList efl : eventFieldLists) {
                            UaMonitoredItem item = subscription.getItemsByClientHandle().get(efl.getClientHandle());
                            if (item == null) continue;
                            monitoredItems.add(item);
                            eventFields.add(efl.getEventFields());
                        }
                        subscription.getNotificationListeners().forEach(listener -> listener.onEventNotification(subscription, monitoredItems, eventFields, notificationMessage.getPublishTime()));
                        continue;
                    }
                    if (!(o instanceof StatusChangeNotification)) continue;
                    StatusChangeNotification scn = (StatusChangeNotification)o;
                    this.logger.debug("StatusChangeNotification: {}", (Object)scn.getStatus());
                    this.subscriptionListeners.forEach(listener -> listener.onStatusChanged(subscription, scn.getStatus()));
                    subscription.getNotificationListeners().forEach(listener -> listener.onStatusChangedNotification(subscription, scn.getStatus()));
                    if (scn.getStatus().getValue() != 0x800A0000L) continue;
                    this.subscriptions.remove(subscription.getSubscriptionId());
                }
            }
            finally {
                permit.release();
                delivered.complete(Unit.VALUE);
            }
        }));
        return delivered;
    }

    public void startPublishing() {
        this.maybeSendPublishRequests();
    }

    public void clearSubscriptions() {
        this.subscriptions.clear();
    }

    public void pauseDelivery() {
        this.deliveryQueue.pause();
    }

    public void resumeDelivery() {
        this.deliveryQueue.resume();
    }

    public void cancelWatchdogTimers() {
        this.watchdogTimers.values().forEach(WatchdogTimer::cancel);
        this.watchdogTimers.clear();
    }

    private class WatchdogTimer {
        private final AtomicReference<ScheduledFuture<?>> scheduledFuture = new AtomicReference();
        private final OpcUaSubscription subscription;
        private final double multiplier;

        WatchdogTimer(OpcUaSubscription subscription, double multiplier) {
            this.subscription = subscription;
            this.multiplier = Math.max(1.0, multiplier);
        }

        void kick() {
            ScheduledFuture<?> sf = this.scheduledFuture.get();
            if (sf != null) {
                sf.cancel(false);
            }
            this.scheduleNext();
        }

        void cancel() {
            ScheduledFuture sf = this.scheduledFuture.getAndSet(null);
            if (sf != null) {
                sf.cancel(false);
            }
        }

        private void scheduleNext() {
            long delay = Math.round(this.subscription.getRevisedPublishingInterval() * (double)this.subscription.getRevisedMaxKeepAliveCount().longValue() * this.multiplier);
            ScheduledFuture<?> nextSf = OpcUaSubscriptionManager.this.client.getConfig().getScheduledExecutor().schedule(() -> OpcUaSubscriptionManager.this.client.getConfig().getExecutor().execute(this::notifyListeners), delay, TimeUnit.MILLISECONDS);
            this.scheduledFuture.set(nextSf);
        }

        private void notifyListeners() {
            OpcUaSubscriptionManager.this.subscriptionListeners.forEach(subscriptionListener -> subscriptionListener.onSubscriptionWatchdogTimerElapsed(this.subscription));
            this.subscription.getNotificationListeners().forEach(notificationListener -> notificationListener.onSubscriptionWatchdogTimerElapsed(this.subscription));
        }
    }
}

