/*
 * Decompiled with CFR 0.152.
 */
package rx.quasar;

import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.AbstractFuture;
import co.paralleluniverse.strands.ConditionSynchronizer;
import co.paralleluniverse.strands.SimpleConditionSynchronizer;
import co.paralleluniverse.strands.Strand;
import co.paralleluniverse.strands.channels.Channels;
import co.paralleluniverse.strands.channels.DelegatingReceivePort;
import co.paralleluniverse.strands.channels.ProducerException;
import co.paralleluniverse.strands.channels.ReceivePort;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.Exceptions;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observers.SafeSubscriber;
import rx.quasar.ChannelObservable;

public class BlockingObservable<T> {
    private static final int BUFFER_SIZE = 10;
    private final Observable<T> o;

    private BlockingObservable(Observable<T> o) {
        this.o = o;
    }

    public static <T> BlockingObservable<T> from(Observable<T> o) {
        return new BlockingObservable<T>(o);
    }

    private Subscription protectivelyWrapAndSubscribe(Subscriber<T> observer) {
        return this.o.subscribe((Subscriber)new SafeSubscriber(observer));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Suspendable
    public void forEach(Action1<? super T> onNext) {
        try {
            final AtomicBoolean done = new AtomicBoolean(false);
            SimpleConditionSynchronizer sync = new SimpleConditionSynchronizer((Object)this);
            AtomicReference exceptionFromOnError = new AtomicReference();
            this.protectivelyWrapAndSubscribe(new Subscriber<T>((ConditionSynchronizer)sync, exceptionFromOnError, onNext){
                final /* synthetic */ ConditionSynchronizer val$sync;
                final /* synthetic */ AtomicReference val$exceptionFromOnError;
                final /* synthetic */ Action1 val$onNext;
                {
                    this.val$sync = conditionSynchronizer;
                    this.val$exceptionFromOnError = atomicReference;
                    this.val$onNext = action1;
                }

                public void onCompleted() {
                    done.set(true);
                    this.val$sync.signalAll();
                }

                public void onError(Throwable e) {
                    this.val$exceptionFromOnError.set(e);
                    done.set(true);
                    this.val$sync.signalAll();
                }

                public void onNext(T args) {
                    this.val$onNext.call(args);
                }
            });
            try {
                Object token = sync.register();
                try {
                    int i = 0;
                    while (!done.get()) {
                        sync.await(i);
                        ++i;
                    }
                }
                finally {
                    sync.unregister(token);
                }
            }
            catch (InterruptedException e) {
                Strand.currentStrand().interrupt();
                throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
            }
            if (exceptionFromOnError.get() != null) {
                if (exceptionFromOnError.get() instanceof RuntimeException) {
                    throw (RuntimeException)exceptionFromOnError.get();
                }
                throw new RuntimeException((Throwable)exceptionFromOnError.get());
            }
        }
        catch (SuspendExecution e) {
            throw new AssertionError((Object)e);
        }
    }

    public ReceivePort<T> toChannel() {
        return ChannelObservable.subscribe(10, Channels.OverflowPolicy.BLOCK, this.o);
    }

    @Suspendable
    public T first() {
        return BlockingObservable.from(this.o.first()).single();
    }

    @Suspendable
    public T first(Func1<? super T, Boolean> predicate) {
        return BlockingObservable.from(this.o.first(predicate)).single();
    }

    @Suspendable
    public T firstOrDefault(T defaultValue) {
        return BlockingObservable.from(this.o.take(1)).singleOrDefault(defaultValue);
    }

    @Suspendable
    public T firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
        return BlockingObservable.from(this.o.filter(predicate)).firstOrDefault(defaultValue);
    }

    @Suspendable
    public T last() {
        return BlockingObservable.from(this.o.last()).single();
    }

    @Suspendable
    public T last(Func1<? super T, Boolean> predicate) {
        return BlockingObservable.from(this.o.last(predicate)).single();
    }

    @Suspendable
    public T lastOrDefault(T defaultValue) {
        return BlockingObservable.from(this.o.takeLast(1)).singleOrDefault(defaultValue);
    }

    @Suspendable
    public T lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
        return BlockingObservable.from(this.o.filter(predicate)).lastOrDefault(defaultValue);
    }

    public ReceivePort<T> mostRecent(T initialValue) {
        return new RecentReceivePort<T>(ChannelObservable.subscribe(1, Channels.OverflowPolicy.DISPLACE, this.o), initialValue);
    }

    public ReceivePort<T> next() {
        return ChannelObservable.subscribe(1, Channels.OverflowPolicy.DISPLACE, this.o);
    }

    public ReceivePort<T> latest() {
        return new LatestReceivePort<T>(ChannelObservable.subscribe(1, Channels.OverflowPolicy.DISPLACE, this.o));
    }

    @Suspendable
    public T single() {
        try {
            return (T)BlockingObservable.from(this.o.single()).toChannel().receive();
        }
        catch (ProducerException e) {
            throw Exceptions.propagate((Throwable)e.getCause());
        }
        catch (InterruptedException e) {
            Strand.currentStrand().interrupt();
            throw Exceptions.propagate((Throwable)e);
        }
        catch (SuspendExecution e) {
            throw new AssertionError((Object)e);
        }
    }

    @Suspendable
    public T single(Func1<? super T, Boolean> predicate) {
        try {
            return (T)BlockingObservable.from(this.o.single(predicate)).toChannel().receive();
        }
        catch (ProducerException e) {
            throw Exceptions.propagate((Throwable)e.getCause());
        }
        catch (InterruptedException e) {
            Strand.currentStrand().interrupt();
            throw Exceptions.propagate((Throwable)e);
        }
        catch (SuspendExecution e) {
            throw new AssertionError((Object)e);
        }
    }

    @Suspendable
    public T singleOrDefault(T defaultValue) {
        try {
            ReceivePort<T> c = this.toChannel();
            Object result = c.receive();
            if (result == null) {
                return defaultValue;
            }
            if (c.receive() != null) {
                throw new IllegalArgumentException("Sequence contains too many elements");
            }
            return (T)result;
        }
        catch (ProducerException e) {
            throw Exceptions.propagate((Throwable)e.getCause());
        }
        catch (InterruptedException e) {
            Strand.currentStrand().interrupt();
            throw Exceptions.propagate((Throwable)e);
        }
        catch (SuspendExecution e) {
            throw new AssertionError((Object)e);
        }
    }

    @Suspendable
    public T singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
        return BlockingObservable.from(this.o.filter(predicate)).singleOrDefault(defaultValue);
    }

    public Future<T> toFuture() {
        return new AbstractFuture<T>(){
            final AtomicReference<T> val = new AtomicReference();
            {
                BlockingObservable.this.o.subscribe(new Observer<T>(){

                    public void onCompleted() {
                        this.set(val.get());
                    }

                    public void onError(Throwable e) {
                        this.setException(e);
                    }

                    public void onNext(T t) {
                        if (!val.compareAndSet(null, t)) {
                            this.setException(new IllegalStateException("Observable.toFuture() only supports sequences with a single value."));
                        }
                    }
                });
            }
        };
    }

    private static class LatestReceivePort<V>
    extends DelegatingReceivePort<V> {
        private V value = null;

        public LatestReceivePort(ReceivePort<V> target) {
            super(target);
        }

        public V receive() throws SuspendExecution, InterruptedException {
            V v = this.getValue();
            if (v == null && !this.isClosed()) {
                this.value = super.receive();
                return this.value;
            }
            return v;
        }

        public V receive(long timeout, TimeUnit unit) throws SuspendExecution, InterruptedException {
            V v = this.getValue();
            if (v == null && !this.isClosed()) {
                this.value = super.receive(timeout, unit);
                return this.value;
            }
            return v;
        }

        public V tryReceive() {
            return this.getValue();
        }

        public boolean isClosed() {
            return super.isClosed() && this.value == null;
        }

        private V getValue() {
            V v = this.tryReceive();
            if (v == null) {
                v = this.value;
                if (this.isClosed()) {
                    this.value = null;
                    return null;
                }
                return v;
            }
            this.value = v;
            return v;
        }
    }

    private static class RecentReceivePort<V>
    extends DelegatingReceivePort<V> {
        private V value;

        public RecentReceivePort(ReceivePort<V> target, V initialValue) {
            super(target);
            this.value = initialValue;
        }

        public V receive(long timeout, TimeUnit unit) {
            return this.getValue();
        }

        public V receive() {
            return this.getValue();
        }

        public V tryReceive() {
            return this.getValue();
        }

        private V getValue() {
            Object v = super.tryReceive();
            if (v == null) {
                if (this.isClosed()) {
                    return null;
                }
                return this.value;
            }
            this.value = v;
            return (V)v;
        }
    }
}

