/* * Quasseldroid - Quassel client for Android * * Copyright (c) 2019 Janne Koschinski * Copyright (c) 2019 The Quassel Project * * This program is free software: you can redistribute it and/or modify it * under the terms of the GNU General Public License version 3 as published * by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program. If not, see <http://www.gnu.org/licenses/>. */ package de.kuschku.libquassel.util.rxjava; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import io.reactivex.Observer; import io.reactivex.annotations.CheckReturnValue; import io.reactivex.annotations.Experimental; import io.reactivex.annotations.Nullable; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.SimpleQueue; import io.reactivex.internal.observers.BasicIntQueueDisposable; import io.reactivex.internal.queue.SpscLinkedArrayQueue; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.Subject; /** * A Subject that queues up events until a single {@link Observer} subscribes to it, replays * those events to it until the {@code Observer} catches up and then switches to relaying events live to * this single {@code Observer} until this {@code UnicastSubject} terminates or the {@code Observer} unsubscribes. * <p> * <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/UnicastSubject.png" alt=""> * <p> * Note that {@code UnicastSubject} holds an unbounded internal buffer. * <p> * This subject does not have a public constructor by design; a new empty instance of this * {@code UnicastSubject} can be created via the following {@code create} methods that * allow specifying the retention policy for items: * <ul> * <li>{@link #create()} - creates an empty, unbounded {@code UnicastSubject} that * caches all items and the terminal event it receives.</li> * <li>{@link #create(int)} - creates an empty, unbounded {@code UnicastSubject} * with a hint about how many <b>total</b> items one expects to retain.</li> * <li>{@link #create(boolean)} - creates an empty, unbounded {@code UnicastSubject} that * optionally delays an error it receives and replays it after the regular items have been emitted.</li> * <li>{@link #create(int, Runnable)} - creates an empty, unbounded {@code UnicastSubject} * with a hint about how many <b>total</b> items one expects to retain and a callback that will be * called exactly once when the {@code UnicastSubject} gets terminated or the single {@code Observer} unsubscribes.</li> * <li>{@link #create(int, Runnable, boolean)} - creates an empty, unbounded {@code UnicastSubject} * with a hint about how many <b>total</b> items one expects to retain and a callback that will be * called exactly once when the {@code UnicastSubject} gets terminated or the single {@code Observer} unsubscribes * and optionally delays an error it receives and replays it after the regular items have been emitted.</li> * </ul> * <p> * If more than one {@code Observer} attempts to subscribe to this {@code UnicastSubject}, they * will receive an {@code IllegalStateException} indicating the single-use-only nature of this {@code UnicastSubject}, * even if the {@code UnicastSubject} already terminated with an error. * <p> * Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, * {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>) as * parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a * {@link NullPointerException} being thrown and the subject's state is not changed. * <p> * Since a {@code UnicastSubject} is an {@link io.reactivex.Observable}, it does not support backpressure. * <p> * When this {@code UnicastSubject} is terminated via {@link #onError(Throwable)} the current or late single {@code Observer} * may receive the {@code Throwable} before any available items could be emitted. To make sure an onError event is delivered * to the {@code Observer} after the normal items, create a {@code UnicastSubject} with the {@link #create(boolean)} or * {@link #create(int, Runnable, boolean)} factory methods. * <p> * Even though {@code UnicastSubject} implements the {@code Observer} interface, calling * {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>) * if the subject is used as a standalone source. However, calling {@code onSubscribe} * after the {@code UnicastSubject} reached its terminal state will result in the * given {@code Disposable} being disposed immediately. * <p> * Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} * is required to be serialized (called from the same thread or called non-overlappingly from different threads * through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer} * consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively). * <p> * This {@code UnicastSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, * {@link #getThrowable()} and {@link #hasObservers()}. * <dl> * <dt><b>Scheduler:</b></dt> * <dd>{@code UnicastSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and * the {@code Observer}s get notified on the thread the respective {@code onXXX} methods were invoked.</dd> * <dt><b>Error handling:</b></dt> * <dd>When the {@link #onError(Throwable)} is called, the {@code UnicastSubject} enters into a terminal state * and emits the same {@code Throwable} instance to the last set of {@code Observer}s. During this emission, * if one or more {@code Observer}s dispose their respective {@code Disposable}s, the * {@code Throwable} is delivered to the global error handler via * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Observer}s * cancel at once). * If there were no {@code Observer}s subscribed to this {@code UnicastSubject} when the {@code onError()} * was called, the global error handler is not invoked. * </dd> * </dl> * <p> * Example usage: * <pre><code> * UnicastSubject<Integer> subject = UnicastSubject.create(); * * TestObserver<Integer> to1 = subject.test(); * * // fresh UnicastSubjects are empty * to1.assertEmpty(); * * TestObserver<Integer> to2 = subject.test(); * * // A UnicastSubject only allows one Observer during its lifetime * to2.assertFailure(IllegalStateException.class); * * subject.onNext(1); * to1.assertValue(1); * * subject.onNext(2); * to1.assertValues(1, 2); * * subject.onComplete(); * to1.assertResult(1, 2); * * // ---------------------------------------------------- * * UnicastSubject<Integer> subject2 = UnicastSubject.create(); * * // a UnicastSubject caches events util its single Observer subscribes * subject.onNext(1); * subject.onNext(2); * subject.onComplete(); * * TestObserver<Integer> to3 = subject2.test(); * * // the cached events are emitted in order * to3.assertResult(1, 2); * </code></pre> * * @param <T> the value type received and emitted by this Subject subclass * @since 2.0 */ public final class ReusableUnicastSubject<T> extends Subject<T> { /** * The queue that buffers the source events. */ private final SpscLinkedArrayQueue<T> queue; /** * The single Observer. */ private final AtomicReference<Observer<? super T>> actual; /** * The optional callback when the Subject gets cancelled or terminates. */ private final AtomicReference<Runnable> onTerminate; /** * deliver onNext events before error event. */ private final boolean delayError; /** * Set to 1 atomically for the first and only Subscriber. */ private final AtomicBoolean once; /** * The wip counter and QueueDisposable surface. */ private final BasicIntQueueDisposable<T> wip; /** * Indicates the single observer has cancelled. */ private volatile boolean disposed; /** * Indicates the source has terminated. */ private volatile boolean done; /** * The terminal error if not null. * Must be set before writing to done and read after done == true. */ private Throwable error; private boolean enableOperatorFusion; /** * Creates an UnicastSubject with the given capacity hint and delay error flag. * * @param capacityHint the capacity hint for the internal, unbounded queue * @param delayError deliver pending onNext events before onError * @since 2.0.8 - experimental */ ReusableUnicastSubject(int capacityHint, boolean delayError) { this.queue = new SpscLinkedArrayQueue<>(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); this.onTerminate = new AtomicReference<>(); this.delayError = delayError; this.actual = new AtomicReference<>(); this.once = new AtomicBoolean(); this.wip = new ReusableUnicastSubject.UnicastQueueDisposable(); } /** * Creates an UnicastSubject with the given capacity hint and callback * for when the Subject is terminated normally or its single Subscriber cancels. * * @param capacityHint the capacity hint for the internal, unbounded queue * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed * @since 2.0 */ ReusableUnicastSubject(int capacityHint, Runnable onTerminate) { this(capacityHint, onTerminate, true); } /** * Creates an UnicastSubject with the given capacity hint, delay error flag and callback * for when the Subject is terminated normally or its single Subscriber cancels. * * @param capacityHint the capacity hint for the internal, unbounded queue * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed * @param delayError deliver pending onNext events before onError * @since 2.0.8 - experimental */ ReusableUnicastSubject(int capacityHint, Runnable onTerminate, boolean delayError) { this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); this.onTerminate = new AtomicReference<Runnable>(ObjectHelper.requireNonNull(onTerminate, "onTerminate")); this.delayError = delayError; this.actual = new AtomicReference<Observer<? super T>>(); this.once = new AtomicBoolean(); this.wip = new ReusableUnicastSubject.UnicastQueueDisposable(); } /** * Creates an UnicastSubject with an internal buffer capacity hint 16. * * @param <T> the value type * @return an UnicastSubject instance */ @CheckReturnValue public static <T> ReusableUnicastSubject<T> create() { return new ReusableUnicastSubject<T>(bufferSize(), true); } /** * Creates an UnicastSubject with the given internal buffer capacity hint. * * @param <T> the value type * @param capacityHint the hint to size the internal unbounded buffer * @return an UnicastSubject instance */ @CheckReturnValue public static <T> ReusableUnicastSubject<T> create(int capacityHint) { return new ReusableUnicastSubject<T>(capacityHint, true); } /** * Creates an UnicastSubject with the given internal buffer capacity hint and a callback for * the case when the single Subscriber cancels its subscription. * * <p>The callback, if not null, is called exactly once and * non-overlapped with any active replay. * * @param <T> the value type * @param capacityHint the hint to size the internal unbounded buffer * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed * @return an UnicastSubject instance */ @CheckReturnValue public static <T> ReusableUnicastSubject<T> create(int capacityHint, Runnable onTerminate) { return new ReusableUnicastSubject<T>(capacityHint, onTerminate, true); } /** * Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and * a callback for the case when the single Subscriber cancels its subscription. * * <p>The callback, if not null, is called exactly once and * non-overlapped with any active replay. * * @param <T> the value type * @param capacityHint the hint to size the internal unbounded buffer * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed * @param delayError deliver pending onNext events before onError * @return an UnicastSubject instance * @since 2.0.8 - experimental */ @CheckReturnValue @Experimental public static <T> ReusableUnicastSubject<T> create(int capacityHint, Runnable onTerminate, boolean delayError) { return new ReusableUnicastSubject<T>(capacityHint, onTerminate, delayError); } /** * Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag. * * <p>The callback, if not null, is called exactly once and * non-overlapped with any active replay. * * @param <T> the value type * @param delayError deliver pending onNext events before onError * @return an UnicastSubject instance * @since 2.0.8 - experimental */ @CheckReturnValue @Experimental public static <T> ReusableUnicastSubject<T> create(boolean delayError) { return new ReusableUnicastSubject<T>(bufferSize(), delayError); } @Override protected void subscribeActual(Observer<? super T> observer) { if (!once.get() && once.compareAndSet(false, true)) { observer.onSubscribe(wip); actual.lazySet(observer); // full barrier in drain if (disposed) { actual.lazySet(null); return; } drain(); } else { EmptyDisposable.error(new IllegalStateException("Only a single observer allowed."), observer); } } void doTerminate() { Runnable r = onTerminate.get(); if (r != null && onTerminate.compareAndSet(r, null)) { r.run(); } } @Override public void onSubscribe(Disposable s) { if (done || disposed) { s.dispose(); } } @Override public void onNext(T t) { ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources."); if (done || disposed) { return; } queue.offer(t); drain(); } @Override public void onError(Throwable t) { ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources."); if (done || disposed) { RxJavaPlugins.onError(t); return; } error = t; done = true; doTerminate(); drain(); } @Override public void onComplete() { if (done || disposed) { return; } done = true; doTerminate(); drain(); } void drainNormal(Observer<? super T> a) { int missed = 1; SimpleQueue<T> q = queue; boolean failFast = !this.delayError; boolean canBeError = true; for (; ; ) { for (; ; ) { if (disposed) { actual.lazySet(null); q.clear(); return; } boolean d = this.done; T v = queue.poll(); boolean empty = v == null; if (d) { if (failFast && canBeError) { if (failedFast(q, a)) { return; } else { canBeError = false; } } if (empty) { errorOrComplete(a); return; } } if (empty) { break; } a.onNext(v); } missed = wip.addAndGet(-missed); if (missed == 0) { break; } } } void drainFused(Observer<? super T> a) { int missed = 1; final SpscLinkedArrayQueue<T> q = queue; final boolean failFast = !delayError; for (; ; ) { if (disposed) { actual.lazySet(null); q.clear(); return; } boolean d = done; if (failFast && d) { if (failedFast(q, a)) { return; } } a.onNext(null); if (d) { errorOrComplete(a); return; } missed = wip.addAndGet(-missed); if (missed == 0) { break; } } } void errorOrComplete(Observer<? super T> a) { actual.lazySet(null); Throwable ex = error; if (ex != null) { a.onError(ex); } else { a.onComplete(); } } boolean failedFast(final SimpleQueue<T> q, Observer<? super T> a) { Throwable ex = error; if (ex != null) { actual.lazySet(null); q.clear(); a.onError(ex); return true; } else { return false; } } void drain() { if (wip.getAndIncrement() != 0) { return; } Observer<? super T> a = actual.get(); int missed = 1; for (; ; ) { if (a != null) { if (enableOperatorFusion) { drainFused(a); } else { drainNormal(a); } return; } missed = wip.addAndGet(-missed); if (missed == 0) { break; } a = actual.get(); } } @Override public boolean hasObservers() { return actual.get() != null; } @Override public Throwable getThrowable() { if (done) { return error; } return null; } @Override public boolean hasThrowable() { return done && error != null; } @Override public boolean hasComplete() { return done && error == null; } final class UnicastQueueDisposable extends BasicIntQueueDisposable<T> { private static final long serialVersionUID = 7926949470189395511L; @Override public int requestFusion(int mode) { if ((mode & ASYNC) != 0) { enableOperatorFusion = true; return ASYNC; } return NONE; } @Nullable @Override public T poll() { return queue.poll(); } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override public void clear() { queue.clear(); } @Override public void dispose() { if (!disposed) { once.set(false); actual.set(null); wip.set(0); queue.clear(); } } @Override public boolean isDisposed() { return disposed; } } }