From a6bec9fd844dd7a5d56670ecbbeaaca026dfd254 Mon Sep 17 00:00:00 2001 From: Janne Koschinski <janne@kuschku.de> Date: Thu, 10 Jan 2019 12:29:06 +0100 Subject: [PATCH] Fixed issues with errors not being properly shown - Introduced ReusableUnicastSubject to properly replay errors, but only once - Introduced a test for ReusableUnicastSubject to verify that it works --- .../quasseldroid/ui/chat/ChatActivity.kt | 3 +- .../libquassel/connection/CoreConnection.kt | 4 + .../de/kuschku/libquassel/session/ISession.kt | 11 +- .../de/kuschku/libquassel/session/Session.kt | 35 +- .../libquassel/session/SessionManager.kt | 27 +- .../util/rxjava/ReusableUnicastSubject.java | 584 ++++++++++++++++++ .../util/rxjava/ReusableUnicastSubjectTest.kt | 104 ++++ .../viewmodel/QuasselViewModel.kt | 10 +- 8 files changed, 731 insertions(+), 47 deletions(-) create mode 100644 lib/src/main/java/de/kuschku/libquassel/util/rxjava/ReusableUnicastSubject.java create mode 100644 lib/src/test/java/de/kuschku/libquassel/util/rxjava/ReusableUnicastSubjectTest.kt diff --git a/app/src/main/java/de/kuschku/quasseldroid/ui/chat/ChatActivity.kt b/app/src/main/java/de/kuschku/quasseldroid/ui/chat/ChatActivity.kt index 4f4de0fc9..4c9bf568d 100644 --- a/app/src/main/java/de/kuschku/quasseldroid/ui/chat/ChatActivity.kt +++ b/app/src/main/java/de/kuschku/quasseldroid/ui/chat/ChatActivity.kt @@ -97,6 +97,7 @@ import de.kuschku.quasseldroid.util.ui.NickCountDrawable import de.kuschku.quasseldroid.util.ui.WarningBarView import de.kuschku.quasseldroid.viewmodel.EditorViewModel import de.kuschku.quasseldroid.viewmodel.data.BufferData +import io.reactivex.BackpressureStrategy import org.threeten.bp.Instant import org.threeten.bp.ZoneId import org.threeten.bp.format.DateTimeFormatter @@ -304,7 +305,7 @@ class ChatActivity : ServiceBoundActivity(), SharedPreferences.OnSharedPreferenc }) // User-actionable errors that require immediate action, and should show up as dialog - viewModel.errors.toLiveData().observe(this, Observer { error -> + viewModel.errors.toLiveData(BackpressureStrategy.BUFFER).observe(this, Observer { error -> error?.let { when (it) { is Error.HandshakeError -> it.message.let { diff --git a/lib/src/main/java/de/kuschku/libquassel/connection/CoreConnection.kt b/lib/src/main/java/de/kuschku/libquassel/connection/CoreConnection.kt index 21baefd98..6b2f08735 100644 --- a/lib/src/main/java/de/kuschku/libquassel/connection/CoreConnection.kt +++ b/lib/src/main/java/de/kuschku/libquassel/connection/CoreConnection.kt @@ -248,7 +248,11 @@ class CoreConnection( cause = cause?.cause } while (cause != null && exception == null) if (exception != null) { + val securityExceptionCallback = this.securityExceptionCallback close() + log(WARN, + TAG, + "Security error encountered in connection: ${exception::class.java.canonicalName}") securityExceptionCallback?.invoke(exception) } else { if (!closed) { diff --git a/lib/src/main/java/de/kuschku/libquassel/session/ISession.kt b/lib/src/main/java/de/kuschku/libquassel/session/ISession.kt index debab392d..f122fe96f 100644 --- a/lib/src/main/java/de/kuschku/libquassel/session/ISession.kt +++ b/lib/src/main/java/de/kuschku/libquassel/session/ISession.kt @@ -26,8 +26,6 @@ import de.kuschku.libquassel.protocol.NetworkId import de.kuschku.libquassel.quassel.QuasselFeatures import de.kuschku.libquassel.quassel.syncables.* import de.kuschku.libquassel.util.Optional -import io.reactivex.BackpressureStrategy -import io.reactivex.Flowable import io.reactivex.Observable import io.reactivex.subjects.BehaviorSubject import io.reactivex.subjects.PublishSubject @@ -62,8 +60,8 @@ interface ISession : Closeable { fun identity(id: IdentityId): Identity? val proxy: SignalProxy - val error: Flowable<Error> - val connectionError: Flowable<Throwable> + val error: Observable<Error> + val connectionError: Observable<Throwable> val lag: Observable<Long> fun login(user: String, pass: String) @@ -71,9 +69,8 @@ interface ISession : Closeable { companion object { val NULL = object : ISession { override val proxy: SignalProxy = SignalProxy.NULL - override val error = BehaviorSubject.create<Error>().toFlowable(BackpressureStrategy.BUFFER) - override val connectionError = BehaviorSubject.create<Throwable>().toFlowable( - BackpressureStrategy.BUFFER) + override val error = Observable.empty<Error>() + override val connectionError = Observable.empty<Throwable>() override val state = BehaviorSubject.createDefault(ConnectionState.DISCONNECTED) override val features: Features = Features( QuasselFeatures.empty(), diff --git a/lib/src/main/java/de/kuschku/libquassel/session/Session.kt b/lib/src/main/java/de/kuschku/libquassel/session/Session.kt index 84ab01c34..ea53f85f3 100644 --- a/lib/src/main/java/de/kuschku/libquassel/session/Session.kt +++ b/lib/src/main/java/de/kuschku/libquassel/session/Session.kt @@ -27,9 +27,9 @@ import de.kuschku.libquassel.quassel.ExtendedFeature import de.kuschku.libquassel.quassel.QuasselFeatures import de.kuschku.libquassel.quassel.syncables.* import de.kuschku.libquassel.util.compatibility.HandlerService -import de.kuschku.libquassel.util.compatibility.LoggingHandler import de.kuschku.libquassel.util.compatibility.LoggingHandler.Companion.log -import io.reactivex.BackpressureStrategy +import de.kuschku.libquassel.util.compatibility.LoggingHandler.LogLevel.INFO +import de.kuschku.libquassel.util.rxjava.ReusableUnicastSubject import io.reactivex.Observable import io.reactivex.subjects.BehaviorSubject import io.reactivex.subjects.PublishSubject @@ -48,7 +48,8 @@ class Session( heartBeatFactory: () -> HeartBeatRunner, val disconnectFromCore: (() -> Unit)?, private val initCallback: ((Session) -> Unit)?, - exceptionHandler: (Throwable) -> Unit + exceptionHandler: (Throwable) -> Unit, + private val hasErroredCallback: (Error) -> Unit ) : ProtocolHandler(exceptionHandler), ISession { override val objectStorage: ObjectStorage = ObjectStorage(this) override val proxy: SignalProxy = this @@ -67,11 +68,8 @@ class Session( ) override val state = coreConnection.state - private val _error = PublishSubject.create<Error>() - override val error = _error.toFlowable(BackpressureStrategy.BUFFER) - - private val _connectionError = PublishSubject.create<Throwable>() - override val connectionError = _connectionError.toFlowable(BackpressureStrategy.LATEST) + override val error = ReusableUnicastSubject.create<Error>() + override val connectionError = ReusableUnicastSubject.create<Throwable>() override val aliasManager = AliasManager(this) override val backlogManager = BacklogManager(this, backlogStorage) @@ -113,13 +111,18 @@ class Session( coreConnection.start() } + private fun handleError(error: Error) { + hasErroredCallback.invoke(error) + this.error.onNext(error) + } + override fun handle(f: HandshakeMessage.ClientInitAck): Boolean { features.core = QuasselFeatures(f.coreFeatures, f.featureList) if (f.coreConfigured == true) { login() } else { - _error.onNext(Error.HandshakeError(f)) + handleError(Error.HandshakeError(f)) } return true } @@ -144,26 +147,26 @@ class Session( } override fun handle(f: HandshakeMessage.ClientInitReject): Boolean { - _error.onNext(Error.HandshakeError(f)) + handleError(Error.HandshakeError(f)) return true } override fun handle(f: HandshakeMessage.CoreSetupReject): Boolean { - _error.onNext(Error.HandshakeError(f)) + handleError(Error.HandshakeError(f)) return true } override fun handle(f: HandshakeMessage.ClientLoginReject): Boolean { - _error.onNext(Error.HandshakeError(f)) + handleError(Error.HandshakeError(f)) return true } fun handle(f: QuasselSecurityException) { - _error.onNext(Error.SslError(f)) + handleError(Error.SslError(f)) } - fun handleConnectionError(f: Throwable) { - _connectionError.onNext(f) + fun handleConnectionError(connectionError: Throwable) { + this.connectionError.onNext(connectionError) } fun addNetwork(networkId: NetworkId) { @@ -261,7 +264,7 @@ class Session( val now = Instant.now() heartBeatThread.setLastHeartBeatReply(f.timestamp) val latency = now.toEpochMilli() - f.timestamp.toEpochMilli() - log(LoggingHandler.LogLevel.INFO, "Heartbeat", "Received Heartbeat with ${latency}ms latency") + log(INFO, "Heartbeat", "Received Heartbeat with ${latency}ms latency") lag.onNext(latency) return true } diff --git a/lib/src/main/java/de/kuschku/libquassel/session/SessionManager.kt b/lib/src/main/java/de/kuschku/libquassel/session/SessionManager.kt index 091393d89..5ed8b2298 100644 --- a/lib/src/main/java/de/kuschku/libquassel/session/SessionManager.kt +++ b/lib/src/main/java/de/kuschku/libquassel/session/SessionManager.kt @@ -26,11 +26,8 @@ import de.kuschku.libquassel.protocol.ClientData import de.kuschku.libquassel.quassel.syncables.interfaces.invokers.Invokers import de.kuschku.libquassel.util.compatibility.HandlerService import de.kuschku.libquassel.util.compatibility.LoggingHandler.Companion.log -import de.kuschku.libquassel.util.compatibility.LoggingHandler.LogLevel.DEBUG -import de.kuschku.libquassel.util.compatibility.LoggingHandler.LogLevel.INFO +import de.kuschku.libquassel.util.compatibility.LoggingHandler.LogLevel.* import de.kuschku.libquassel.util.helpers.or -import io.reactivex.BackpressureStrategy -import io.reactivex.Flowable import io.reactivex.Observable import io.reactivex.disposables.Disposable import io.reactivex.functions.BiFunction @@ -79,15 +76,9 @@ class SessionManager( private var hasErrored: Boolean = false - val error: Flowable<Error> - get() = inProgressSession - .toFlowable(BackpressureStrategy.LATEST) - .switchMap(ISession::error) + val error = inProgressSession.switchMap(ISession::error) - val connectionError: Flowable<Throwable> - get() = inProgressSession - .toFlowable(BackpressureStrategy.LATEST) - .switchMap(ISession::connectionError) + val connectionError = inProgressSession.switchMap(ISession::connectionError) val connectionProgress: Observable<Triple<ConnectionState, Int, Int>> = Observable.combineLatest( state, initStatus, @@ -106,10 +97,6 @@ class SessionManager( } }) - disposables.add(error.subscribe { - hasErrored = true - }) - // This should preload them Invokers } @@ -148,11 +135,17 @@ class SessionManager( heartBeatFactory, disconnectFromCore, initCallback, - exceptionHandler + exceptionHandler, + ::hasErroredCallback ) ) } + fun hasErroredCallback(error: Error) { + log(WARN, "SessionManager", "Callback Error occured: $error") + hasErrored = true + } + /** * @return if an autoreconnect has been necessary */ diff --git a/lib/src/main/java/de/kuschku/libquassel/util/rxjava/ReusableUnicastSubject.java b/lib/src/main/java/de/kuschku/libquassel/util/rxjava/ReusableUnicastSubject.java new file mode 100644 index 000000000..69198509e --- /dev/null +++ b/lib/src/main/java/de/kuschku/libquassel/util/rxjava/ReusableUnicastSubject.java @@ -0,0 +1,584 @@ +/* + * Quasseldroid - Quassel client for Android + * + * Copyright (c) 2019 Janne Koschinski + * Copyright (c) 2019 The Quassel Project + * + * This program is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 3 as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package de.kuschku.libquassel.util.rxjava; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.Observer; +import io.reactivex.annotations.CheckReturnValue; +import io.reactivex.annotations.Experimental; +import io.reactivex.annotations.Nullable; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.observers.BasicIntQueueDisposable; +import io.reactivex.internal.queue.SpscLinkedArrayQueue; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.Subject; + +/** + * A Subject that queues up events until a single {@link Observer} subscribes to it, replays + * those events to it until the {@code Observer} catches up and then switches to relaying events live to + * this single {@code Observer} until this {@code UnicastSubject} terminates or the {@code Observer} unsubscribes. + * <p> + * <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/UnicastSubject.png" alt=""> + * <p> + * Note that {@code UnicastSubject} holds an unbounded internal buffer. + * <p> + * This subject does not have a public constructor by design; a new empty instance of this + * {@code UnicastSubject} can be created via the following {@code create} methods that + * allow specifying the retention policy for items: + * <ul> + * <li>{@link #create()} - creates an empty, unbounded {@code UnicastSubject} that + * caches all items and the terminal event it receives.</li> + * <li>{@link #create(int)} - creates an empty, unbounded {@code UnicastSubject} + * with a hint about how many <b>total</b> items one expects to retain.</li> + * <li>{@link #create(boolean)} - creates an empty, unbounded {@code UnicastSubject} that + * optionally delays an error it receives and replays it after the regular items have been emitted.</li> + * <li>{@link #create(int, Runnable)} - creates an empty, unbounded {@code UnicastSubject} + * with a hint about how many <b>total</b> items one expects to retain and a callback that will be + * called exactly once when the {@code UnicastSubject} gets terminated or the single {@code Observer} unsubscribes.</li> + * <li>{@link #create(int, Runnable, boolean)} - creates an empty, unbounded {@code UnicastSubject} + * with a hint about how many <b>total</b> items one expects to retain and a callback that will be + * called exactly once when the {@code UnicastSubject} gets terminated or the single {@code Observer} unsubscribes + * and optionally delays an error it receives and replays it after the regular items have been emitted.</li> + * </ul> + * <p> + * If more than one {@code Observer} attempts to subscribe to this {@code UnicastSubject}, they + * will receive an {@code IllegalStateException} indicating the single-use-only nature of this {@code UnicastSubject}, + * even if the {@code UnicastSubject} already terminated with an error. + * <p> + * Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, + * {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>) as + * parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a + * {@link NullPointerException} being thrown and the subject's state is not changed. + * <p> + * Since a {@code UnicastSubject} is an {@link io.reactivex.Observable}, it does not support backpressure. + * <p> + * When this {@code UnicastSubject} is terminated via {@link #onError(Throwable)} the current or late single {@code Observer} + * may receive the {@code Throwable} before any available items could be emitted. To make sure an onError event is delivered + * to the {@code Observer} after the normal items, create a {@code UnicastSubject} with the {@link #create(boolean)} or + * {@link #create(int, Runnable, boolean)} factory methods. + * <p> + * Even though {@code UnicastSubject} implements the {@code Observer} interface, calling + * {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>) + * if the subject is used as a standalone source. However, calling {@code onSubscribe} + * after the {@code UnicastSubject} reached its terminal state will result in the + * given {@code Disposable} being disposed immediately. + * <p> + * Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} + * is required to be serialized (called from the same thread or called non-overlappingly from different threads + * through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s + * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer} + * consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively). + * <p> + * This {@code UnicastSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, + * {@link #getThrowable()} and {@link #hasObservers()}. + * <dl> + * <dt><b>Scheduler:</b></dt> + * <dd>{@code UnicastSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and + * the {@code Observer}s get notified on the thread the respective {@code onXXX} methods were invoked.</dd> + * <dt><b>Error handling:</b></dt> + * <dd>When the {@link #onError(Throwable)} is called, the {@code UnicastSubject} enters into a terminal state + * and emits the same {@code Throwable} instance to the last set of {@code Observer}s. During this emission, + * if one or more {@code Observer}s dispose their respective {@code Disposable}s, the + * {@code Throwable} is delivered to the global error handler via + * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Observer}s + * cancel at once). + * If there were no {@code Observer}s subscribed to this {@code UnicastSubject} when the {@code onError()} + * was called, the global error handler is not invoked. + * </dd> + * </dl> + * <p> + * Example usage: + * <pre><code> + * UnicastSubject<Integer> subject = UnicastSubject.create(); + * + * TestObserver<Integer> to1 = subject.test(); + * + * // fresh UnicastSubjects are empty + * to1.assertEmpty(); + * + * TestObserver<Integer> to2 = subject.test(); + * + * // A UnicastSubject only allows one Observer during its lifetime + * to2.assertFailure(IllegalStateException.class); + * + * subject.onNext(1); + * to1.assertValue(1); + * + * subject.onNext(2); + * to1.assertValues(1, 2); + * + * subject.onComplete(); + * to1.assertResult(1, 2); + * + * // ---------------------------------------------------- + * + * UnicastSubject<Integer> subject2 = UnicastSubject.create(); + * + * // a UnicastSubject caches events util its single Observer subscribes + * subject.onNext(1); + * subject.onNext(2); + * subject.onComplete(); + * + * TestObserver<Integer> to3 = subject2.test(); + * + * // the cached events are emitted in order + * to3.assertResult(1, 2); + * </code></pre> + * + * @param <T> the value type received and emitted by this Subject subclass + * @since 2.0 + */ +public final class ReusableUnicastSubject<T> extends Subject<T> { + /** + * The queue that buffers the source events. + */ + private final SpscLinkedArrayQueue<T> queue; + + /** + * The single Observer. + */ + private final AtomicReference<Observer<? super T>> actual; + + /** + * The optional callback when the Subject gets cancelled or terminates. + */ + private final AtomicReference<Runnable> onTerminate; + + /** + * deliver onNext events before error event. + */ + private final boolean delayError; + /** + * Set to 1 atomically for the first and only Subscriber. + */ + private final AtomicBoolean once; + /** + * The wip counter and QueueDisposable surface. + */ + private final BasicIntQueueDisposable<T> wip; + /** + * Indicates the single observer has cancelled. + */ + private volatile boolean disposed; + /** + * Indicates the source has terminated. + */ + private volatile boolean done; + /** + * The terminal error if not null. + * Must be set before writing to done and read after done == true. + */ + private Throwable error; + private boolean enableOperatorFusion; + + /** + * Creates an UnicastSubject with the given capacity hint and delay error flag. + * + * @param capacityHint the capacity hint for the internal, unbounded queue + * @param delayError deliver pending onNext events before onError + * @since 2.0.8 - experimental + */ + ReusableUnicastSubject(int capacityHint, boolean delayError) { + this.queue = new SpscLinkedArrayQueue<>(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); + this.onTerminate = new AtomicReference<>(); + this.delayError = delayError; + this.actual = new AtomicReference<>(); + this.once = new AtomicBoolean(); + this.wip = new ReusableUnicastSubject.UnicastQueueDisposable(); + } + + /** + * Creates an UnicastSubject with the given capacity hint and callback + * for when the Subject is terminated normally or its single Subscriber cancels. + * + * @param capacityHint the capacity hint for the internal, unbounded queue + * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed + * @since 2.0 + */ + ReusableUnicastSubject(int capacityHint, Runnable onTerminate) { + this(capacityHint, onTerminate, true); + } + + /** + * Creates an UnicastSubject with the given capacity hint, delay error flag and callback + * for when the Subject is terminated normally or its single Subscriber cancels. + * + * @param capacityHint the capacity hint for the internal, unbounded queue + * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed + * @param delayError deliver pending onNext events before onError + * @since 2.0.8 - experimental + */ + ReusableUnicastSubject(int capacityHint, Runnable onTerminate, boolean delayError) { + this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); + this.onTerminate = new AtomicReference<Runnable>(ObjectHelper.requireNonNull(onTerminate, "onTerminate")); + this.delayError = delayError; + this.actual = new AtomicReference<Observer<? super T>>(); + this.once = new AtomicBoolean(); + this.wip = new ReusableUnicastSubject.UnicastQueueDisposable(); + } + + /** + * Creates an UnicastSubject with an internal buffer capacity hint 16. + * + * @param <T> the value type + * @return an UnicastSubject instance + */ + @CheckReturnValue + public static <T> ReusableUnicastSubject<T> create() { + return new ReusableUnicastSubject<T>(bufferSize(), true); + } + + /** + * Creates an UnicastSubject with the given internal buffer capacity hint. + * + * @param <T> the value type + * @param capacityHint the hint to size the internal unbounded buffer + * @return an UnicastSubject instance + */ + @CheckReturnValue + public static <T> ReusableUnicastSubject<T> create(int capacityHint) { + return new ReusableUnicastSubject<T>(capacityHint, true); + } + + /** + * Creates an UnicastSubject with the given internal buffer capacity hint and a callback for + * the case when the single Subscriber cancels its subscription. + * + * <p>The callback, if not null, is called exactly once and + * non-overlapped with any active replay. + * + * @param <T> the value type + * @param capacityHint the hint to size the internal unbounded buffer + * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed + * @return an UnicastSubject instance + */ + @CheckReturnValue + public static <T> ReusableUnicastSubject<T> create(int capacityHint, Runnable onTerminate) { + return new ReusableUnicastSubject<T>(capacityHint, onTerminate, true); + } + + /** + * Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and + * a callback for the case when the single Subscriber cancels its subscription. + * + * <p>The callback, if not null, is called exactly once and + * non-overlapped with any active replay. + * + * @param <T> the value type + * @param capacityHint the hint to size the internal unbounded buffer + * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed + * @param delayError deliver pending onNext events before onError + * @return an UnicastSubject instance + * @since 2.0.8 - experimental + */ + @CheckReturnValue + @Experimental + public static <T> ReusableUnicastSubject<T> create(int capacityHint, Runnable onTerminate, boolean delayError) { + return new ReusableUnicastSubject<T>(capacityHint, onTerminate, delayError); + } + + /** + * Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag. + * + * <p>The callback, if not null, is called exactly once and + * non-overlapped with any active replay. + * + * @param <T> the value type + * @param delayError deliver pending onNext events before onError + * @return an UnicastSubject instance + * @since 2.0.8 - experimental + */ + @CheckReturnValue + @Experimental + public static <T> ReusableUnicastSubject<T> create(boolean delayError) { + return new ReusableUnicastSubject<T>(bufferSize(), delayError); + } + + @Override + protected void subscribeActual(Observer<? super T> observer) { + if (!once.get() && once.compareAndSet(false, true)) { + observer.onSubscribe(wip); + actual.lazySet(observer); // full barrier in drain + if (disposed) { + actual.lazySet(null); + return; + } + drain(); + } else { + EmptyDisposable.error(new IllegalStateException("Only a single observer allowed."), observer); + } + } + + void doTerminate() { + Runnable r = onTerminate.get(); + if (r != null && onTerminate.compareAndSet(r, null)) { + r.run(); + } + } + + @Override + public void onSubscribe(Disposable s) { + if (done || disposed) { + s.dispose(); + } + } + + @Override + public void onNext(T t) { + ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources."); + if (done || disposed) { + return; + } + queue.offer(t); + drain(); + } + + @Override + public void onError(Throwable t) { + ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources."); + if (done || disposed) { + RxJavaPlugins.onError(t); + return; + } + error = t; + done = true; + + doTerminate(); + + drain(); + } + + @Override + public void onComplete() { + if (done || disposed) { + return; + } + done = true; + + doTerminate(); + + drain(); + } + + void drainNormal(Observer<? super T> a) { + int missed = 1; + SimpleQueue<T> q = queue; + boolean failFast = !this.delayError; + boolean canBeError = true; + for (; ; ) { + for (; ; ) { + + if (disposed) { + actual.lazySet(null); + q.clear(); + return; + } + + boolean d = this.done; + T v = queue.poll(); + boolean empty = v == null; + + if (d) { + if (failFast && canBeError) { + if (failedFast(q, a)) { + return; + } else { + canBeError = false; + } + } + + if (empty) { + errorOrComplete(a); + return; + } + } + + if (empty) { + break; + } + + a.onNext(v); + } + + missed = wip.addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + void drainFused(Observer<? super T> a) { + int missed = 1; + + final SpscLinkedArrayQueue<T> q = queue; + final boolean failFast = !delayError; + + for (; ; ) { + + if (disposed) { + actual.lazySet(null); + q.clear(); + return; + } + boolean d = done; + + if (failFast && d) { + if (failedFast(q, a)) { + return; + } + } + + a.onNext(null); + + if (d) { + errorOrComplete(a); + return; + } + + missed = wip.addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + void errorOrComplete(Observer<? super T> a) { + actual.lazySet(null); + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + } + + boolean failedFast(final SimpleQueue<T> q, Observer<? super T> a) { + Throwable ex = error; + if (ex != null) { + actual.lazySet(null); + q.clear(); + a.onError(ex); + return true; + } else { + return false; + } + } + + void drain() { + if (wip.getAndIncrement() != 0) { + return; + } + + Observer<? super T> a = actual.get(); + int missed = 1; + + for (; ; ) { + + if (a != null) { + if (enableOperatorFusion) { + drainFused(a); + } else { + drainNormal(a); + } + return; + } + + missed = wip.addAndGet(-missed); + if (missed == 0) { + break; + } + + a = actual.get(); + } + } + + @Override + public boolean hasObservers() { + return actual.get() != null; + } + + @Override + public Throwable getThrowable() { + if (done) { + return error; + } + return null; + } + + @Override + public boolean hasThrowable() { + return done && error != null; + } + + @Override + public boolean hasComplete() { + return done && error == null; + } + + final class UnicastQueueDisposable extends BasicIntQueueDisposable<T> { + private static final long serialVersionUID = 7926949470189395511L; + + @Override + public int requestFusion(int mode) { + if ((mode & ASYNC) != 0) { + enableOperatorFusion = true; + return ASYNC; + } + return NONE; + } + + @Nullable + @Override + public T poll() { + return queue.poll(); + } + + @Override + public boolean isEmpty() { + return queue.isEmpty(); + } + + @Override + public void clear() { + queue.clear(); + } + + @Override + public void dispose() { + if (!disposed) { + once.set(false); + actual.set(null); + wip.set(0); + queue.clear(); + } + } + + @Override + public boolean isDisposed() { + return disposed; + } + + } +} diff --git a/lib/src/test/java/de/kuschku/libquassel/util/rxjava/ReusableUnicastSubjectTest.kt b/lib/src/test/java/de/kuschku/libquassel/util/rxjava/ReusableUnicastSubjectTest.kt new file mode 100644 index 000000000..a4773c350 --- /dev/null +++ b/lib/src/test/java/de/kuschku/libquassel/util/rxjava/ReusableUnicastSubjectTest.kt @@ -0,0 +1,104 @@ +/* + * Quasseldroid - Quassel client for Android + * + * Copyright (c) 2019 Janne Koschinski + * Copyright (c) 2019 The Quassel Project + * + * This program is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 3 as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +package de.kuschku.libquassel.util.rxjava + +import org.junit.Assert.assertEquals +import org.junit.Test + +class ReusableUnicastSubjectTest { + @Test + fun test() { + // We have Object A and B1, B2, etc. + // + // Object A should provide an observable to Objects B1, B2, etc. + // Object A will at some points publish items to this Observable. + // As long as no subscriber is subscribed, these items should be buffered. + // + // As soon as a subscriber subscribers, it should get all buffered items, as well as all that + // come after until it unsubscribes. + // + // If the subscriber unsubscribes again, the observable should buffer incoming items again, + // until another subscriber subscribes again + + val expected1 = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + val received1 = mutableListOf<Int>() + + val expected2 = listOf(11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25) + val received2 = mutableListOf<Int>() + + val expected3 = listOf(21, 22, 23, 24, 25, 26, 27, 28, 29, 30) + val received3 = mutableListOf<Int>() + + // We create our observable, this is supposed to be in Object A + val subject = ReusableUnicastSubject.create<Int>() + val observable = subject.publish().refCount() + + // And emit items while no subscriber is subscribed. + // These should get buffered + for (i in 1..5) { + subject.onNext(i) + } + + // B1 subscribes, the subscriber should now receive all buffered items + val subscription1 = observable.subscribe { + received1.add(it) + } + + // We emit items while a subscriber is subscribed, + // these shouldn’t get buffered but instead directly consumed by the subscriber + for (i in 6..10) { + subject.onNext(i) + } + // B1 unsubscribes again, from now on items should get buffered again + subscription1.dispose() + + // These items should get buffered again + for (i in 11..15) { + subject.onNext(i) + } + // As soon as B2 subscribes, it should receive the buffered items 11..15 + val subscription2 = observable.subscribe { + received2.add(it) + } + + // These items should get directly consumed by the subscriber again + for (i in 16..20) { + subject.onNext(i) + } + // B3 should receive no buffered items + val subscription3 = observable.subscribe { + received3.add(it) + } + for (i in 21..25) { + subject.onNext(i) + } + // And B2 unsubscribes again + subscription2.dispose() + // These items should get directly consumed by the B3 + for (i in 26..30) { + subject.onNext(i) + } + subscription3.dispose() + + assertEquals(expected1, received1) + assertEquals(expected2, received2) + assertEquals(expected3, received3) + } +} diff --git a/viewmodel/src/main/java/de/kuschku/quasseldroid/viewmodel/QuasselViewModel.kt b/viewmodel/src/main/java/de/kuschku/quasseldroid/viewmodel/QuasselViewModel.kt index 192a8da91..bfa9a5171 100644 --- a/viewmodel/src/main/java/de/kuschku/quasseldroid/viewmodel/QuasselViewModel.kt +++ b/viewmodel/src/main/java/de/kuschku/quasseldroid/viewmodel/QuasselViewModel.kt @@ -36,8 +36,6 @@ import de.kuschku.libquassel.util.helpers.* import de.kuschku.libquassel.util.irc.IrcCaseMappers import de.kuschku.quasseldroid.util.helper.combineLatest import de.kuschku.quasseldroid.viewmodel.data.* -import io.reactivex.BackpressureStrategy -import io.reactivex.Flowable import io.reactivex.Observable import io.reactivex.subjects.BehaviorSubject import io.reactivex.subjects.PublishSubject @@ -116,12 +114,12 @@ class QuasselViewModel : ViewModel() { } } - val errors = sessionManager.toFlowable(BackpressureStrategy.LATEST).switchMap { - it.orNull()?.error ?: Flowable.empty() + val errors = sessionManager.switchMap { + it.orNull()?.error ?: Observable.empty() } - val connectionErrors = sessionManager.toFlowable(BackpressureStrategy.LATEST).switchMap { - it.orNull()?.connectionError ?: Flowable.empty() + val connectionErrors = sessionManager.switchMap { + it.orNull()?.connectionError ?: Observable.empty() } val sslSession = session.flatMapSwitchMap(ISession::sslSession) -- GitLab