diff --git a/app/src/main/java/de/kuschku/quasseldroid_ng/service/QuasselService.kt b/app/src/main/java/de/kuschku/quasseldroid_ng/service/QuasselService.kt index cac39a1da01154a0f5955f07d05dac5ea2cdddbd..093e6fa0013879f5db891146d8fc1fc6425d5ee2 100644 --- a/app/src/main/java/de/kuschku/quasseldroid_ng/service/QuasselService.kt +++ b/app/src/main/java/de/kuschku/quasseldroid_ng/service/QuasselService.kt @@ -5,7 +5,6 @@ import android.content.Intent import android.os.Binder import de.kuschku.libquassel.protocol.* import de.kuschku.libquassel.session.Backend -import de.kuschku.libquassel.session.CoreConnection import de.kuschku.libquassel.session.Session import de.kuschku.libquassel.session.SocketAddress import de.kuschku.quasseldroid_ng.BuildConfig @@ -25,8 +24,7 @@ class QuasselService : LifecycleService() { override fun connect(address: SocketAddress, user: String, pass: String) { disconnect() val handlerService = AndroidHandlerService() - session.connection.onNext(CoreConnection(session, address, handlerService)) - session.connection.value.start() + session.connect(address, handlerService) session.userData = user to pass } diff --git a/app/src/main/java/de/kuschku/quasseldroid_ng/ui/MainActivity.kt b/app/src/main/java/de/kuschku/quasseldroid_ng/ui/MainActivity.kt index dbaa19a173509158968f3fd22fbbebe94b2cea84..4595cd7664b756e6bb08af27cea70eb10c0ec1fd 100644 --- a/app/src/main/java/de/kuschku/quasseldroid_ng/ui/MainActivity.kt +++ b/app/src/main/java/de/kuschku/quasseldroid_ng/ui/MainActivity.kt @@ -1,7 +1,7 @@ package de.kuschku.quasseldroid_ng.ui -import android.arch.lifecycle.LiveData -import android.arch.lifecycle.MutableLiveData +import android.arch.lifecycle.LiveDataReactiveStreams +import android.arch.lifecycle.Observer import android.os.Bundle import android.support.design.widget.Snackbar import android.util.Log @@ -14,12 +14,12 @@ import butterknife.BindView import butterknife.ButterKnife import de.kuschku.libquassel.session.Backend import de.kuschku.libquassel.session.ConnectionState -import de.kuschku.libquassel.session.Session import de.kuschku.libquassel.session.SocketAddress import de.kuschku.libquassel.util.LoggingHandler import de.kuschku.quasseldroid_ng.R import de.kuschku.quasseldroid_ng.util.helper.stickyMapNotNull -import io.reactivex.disposables.Disposable +import de.kuschku.quasseldroid_ng.util.helper.stickySwitchMapNotNull +import de.kuschku.quasseldroid_ng.util.helper.switchMapNullable import org.threeten.bp.ZoneOffset import org.threeten.bp.ZonedDateTime import org.threeten.bp.format.DateTimeFormatter @@ -49,10 +49,13 @@ class MainActivity : ServiceBoundActivity() { @BindView(R.id.errorList) lateinit var errorList: TextView - private val session: LiveData<Session?> - = stickyMapNotNull(backend, Backend::session, null) - private var subscription: Disposable? = null - private val state = MutableLiveData<ConnectionState>() + private val state = backend.stickyMapNotNull(null, Backend::session) + .switchMapNullable(null) { session -> + LiveDataReactiveStreams.fromPublisher(session.connectionPublisher) + } + .stickySwitchMapNotNull(ConnectionState.DISCONNECTED) { connection -> + LiveDataReactiveStreams.fromPublisher(connection.state) + } private var snackbar: Snackbar? = null @@ -100,8 +103,7 @@ class MainActivity : ServiceBoundActivity() { errorList.text = "" } - /* - status.observe(this, Observer { + state.observe(this, Observer { val disconnected = it == ConnectionState.DISCONNECTED disconnect.isEnabled = !disconnected connect.isEnabled = disconnected @@ -110,7 +112,6 @@ class MainActivity : ServiceBoundActivity() { snackbar = Snackbar.make(errorList, it!!.name, Snackbar.LENGTH_SHORT) snackbar?.show() }) - */ } override fun onCreateOptionsMenu(menu: Menu?): Boolean { diff --git a/app/src/main/java/de/kuschku/quasseldroid_ng/util/helper/TransformationsHelper.kt b/app/src/main/java/de/kuschku/quasseldroid_ng/util/helper/TransformationsHelper.kt index c9b9500952590322365ae45e45b05c188fed8766..ad934b62b48aba3ff1f55b02317fa5b1ade9161f 100644 --- a/app/src/main/java/de/kuschku/quasseldroid_ng/util/helper/TransformationsHelper.kt +++ b/app/src/main/java/de/kuschku/quasseldroid_ng/util/helper/TransformationsHelper.kt @@ -5,16 +5,11 @@ import android.arch.lifecycle.LiveData import android.arch.lifecycle.MediatorLiveData import android.arch.lifecycle.Observer import android.support.annotation.MainThread -import io.reactivex.Observable -import io.reactivex.disposables.Disposable -import io.reactivex.schedulers.Schedulers -import io.reactivex.subjects.BehaviorSubject @MainThread -fun <X, Y> stickySwitchMapNotNull( - trigger: LiveData<X?>, - func: (X) -> LiveData<Y>?, - defaultValue: Y +fun <X, Y> LiveData<X?>.stickySwitchMapNotNull( + defaultValue: Y, + func: (X) -> LiveData<Y>? ): LiveData<Y> { val result = object : MediatorLiveData<Y>() { override fun observe(owner: LifecycleOwner?, observer: Observer<Y>?) { @@ -27,7 +22,7 @@ fun <X, Y> stickySwitchMapNotNull( observer?.onChanged(value ?: defaultValue) } } - result.addSource(trigger, object : Observer<X?> { + result.addSource(this, object : Observer<X?> { internal var mSource: LiveData<Y>? = null override fun onChanged(x: X?) { @@ -50,97 +45,37 @@ fun <X, Y> stickySwitchMapNotNull( } @MainThread -inline fun <X, Y> rxStickySwitchMapNotNull( - trigger: LiveData<X?>, - crossinline func: (X) -> BehaviorSubject<Y>?, - defaultValue: Y +fun <X, Y> LiveData<X?>.switchMapNullable( + defaultValue: Y, + func: (X) -> LiveData<Y>? ): LiveData<Y> { - return stickySwitchMapNotNull(trigger, { - val data = func(it) - if (data != null) - BehaviorSubjectLiveData(data) - else - null - }, defaultValue) -} - -class BehaviorSubjectLiveData<T>(val observable: BehaviorSubject<T>) : LiveData<T>() { - var subscription: Disposable? = null - - override fun getValue(): T? { - return observable.value - } - - override fun setValue(value: T) { - observable.onNext(value) - } - - override fun postValue(value: T) { - observable.onNext(value) - } - - override fun observe(owner: LifecycleOwner?, observer: Observer<T>?) { - super.observe(owner, observer) - if (subscription == null && hasActiveObservers()) { - subscription = observable.subscribeOn(Schedulers.io()).subscribe(this::postValue) - } - } - - override fun observeForever(observer: Observer<T>?) { - super.observeForever(observer) - if (subscription == null && hasActiveObservers()) { - subscription = observable.subscribeOn(Schedulers.io()).subscribe(this::postValue) - } - } - - override fun removeObserver(observer: Observer<T>?) { - super.removeObserver(observer) - if (subscription != null && !hasActiveObservers()) { - subscription?.dispose() - } - } - - override fun removeObservers(owner: LifecycleOwner?) { - super.removeObservers(owner) - if (subscription != null && !hasActiveObservers()) { - subscription?.dispose() - } - } -} - -@MainThread -fun <X, Y> rxSwitchMap( - trigger: LiveData<X?>, - func: (X) -> Observable<Y>?, - defaultValue: Y -): LiveData<Y> { - val result = object : MediatorLiveData<Y>() { - override fun observe(owner: LifecycleOwner?, observer: Observer<Y>?) { - super.observe(owner, observer) - observer?.onChanged(value ?: defaultValue) - } - - override fun observeForever(observer: Observer<Y>?) { - super.observeForever(observer) - observer?.onChanged(value ?: defaultValue) - } - } - result.addSource(trigger, object : Observer<X?> { - internal var mSource: Observable<Y>? = null + val result = MediatorLiveData<Y>() + result.addSource(this, object : Observer<X?> { + internal var mSource: LiveData<Y>? = null override fun onChanged(x: X?) { val newLiveData = if (x != null) func(x) else null - + if (mSource === newLiveData) { + return + } + if (mSource != null) { + result.removeSource(mSource) + } + mSource = newLiveData + if (mSource != null) { + result.addSource(mSource) { y -> result.value = y ?: defaultValue } + } else { + result.value = defaultValue + } } }) return result } @MainThread -fun <X, Y> stickyMapNotNull( - trigger: LiveData<X?>, - func: (X) -> Y?, - defaultValue: Y +fun <X, Y> LiveData<X?>.stickyMapNotNull( + defaultValue: Y, + func: (X) -> Y? ): LiveData<Y> { val result = object : MediatorLiveData<Y>() { override fun observe(owner: LifecycleOwner?, observer: Observer<Y>?) { @@ -153,7 +88,7 @@ fun <X, Y> stickyMapNotNull( observer?.onChanged(value ?: defaultValue) } } - result.addSource(trigger) { x -> + result.addSource(this) { x -> result.setValue(if (x == null) defaultValue else func(x) ?: defaultValue) } return result diff --git a/lib/src/main/java/de/kuschku/libquassel/session/CoreConnection.kt b/lib/src/main/java/de/kuschku/libquassel/session/CoreConnection.kt index 48777e1a00fbb55c17822c380f9509e2eb01aa90..81f4b8e69c79dd7c23fd9b4d3493a33bdf81621f 100644 --- a/lib/src/main/java/de/kuschku/libquassel/session/CoreConnection.kt +++ b/lib/src/main/java/de/kuschku/libquassel/session/CoreConnection.kt @@ -17,6 +17,7 @@ import de.kuschku.libquassel.util.helpers.write import de.kuschku.libquassel.util.log import de.kuschku.libquassel.util.nio.ChainedByteBuffer import de.kuschku.libquassel.util.nio.WrappedChannel +import io.reactivex.BackpressureStrategy import io.reactivex.subjects.BehaviorSubject import org.threeten.bp.ZoneOffset import org.threeten.bp.format.DateTimeFormatter @@ -41,7 +42,8 @@ class CoreConnection( private val sizeBuffer = ByteBuffer.allocateDirect(4) private val chainedBuffer = ChainedByteBuffer(direct = true) - override val state = BehaviorSubject.createDefault(ConnectionState.DISCONNECTED) + private val internalState = BehaviorSubject.createDefault(ConnectionState.DISCONNECTED) + override val state = internalState.toFlowable(BackpressureStrategy.LATEST) private var channel: WrappedChannel? = null @@ -57,7 +59,7 @@ class CoreConnection( override fun setState(value: ConnectionState) { log(INFO, "CoreConnection", value.name) - state.onNext(value) + internalState.onNext(value) } private fun sendHandshake() { @@ -168,7 +170,7 @@ class CoreConnection( } dataBuffer.flip() handlerService.parse { - when (state.value) { + when (internalState.value) { ConnectionState.HANDSHAKE -> processHandshake(dataBuffer) else -> processSigProxy(dataBuffer) } diff --git a/lib/src/main/java/de/kuschku/libquassel/session/ICoreConnection.kt b/lib/src/main/java/de/kuschku/libquassel/session/ICoreConnection.kt index 11d9ddc140219f5c7616d02efef457f7ba38f38d..e23d42b4693b4306b43610312490e34150e3d6a3 100644 --- a/lib/src/main/java/de/kuschku/libquassel/session/ICoreConnection.kt +++ b/lib/src/main/java/de/kuschku/libquassel/session/ICoreConnection.kt @@ -2,10 +2,12 @@ package de.kuschku.libquassel.session import de.kuschku.libquassel.protocol.message.HandshakeMessage import de.kuschku.libquassel.protocol.message.SignalProxyMessage +import io.reactivex.BackpressureStrategy import io.reactivex.subjects.BehaviorSubject +import org.reactivestreams.Publisher interface ICoreConnection { - val state: BehaviorSubject<ConnectionState> + val state: Publisher<ConnectionState> fun close() fun dispatch(message: HandshakeMessage) fun dispatch(message: SignalProxyMessage) @@ -21,8 +23,9 @@ interface ICoreConnection { override fun close() = Unit override fun dispatch(message: HandshakeMessage) = Unit override fun dispatch(message: SignalProxyMessage) = Unit - override val state: BehaviorSubject<ConnectionState> + override val state: Publisher<ConnectionState> get() = BehaviorSubject.createDefault(ConnectionState.DISCONNECTED) + .toFlowable(BackpressureStrategy.LATEST) } } } diff --git a/lib/src/main/java/de/kuschku/libquassel/session/Session.kt b/lib/src/main/java/de/kuschku/libquassel/session/Session.kt index 60543049848edbfb4179e9f06454741d9abe2ef4..389e90769a41a056f96f513c896873d775079366 100644 --- a/lib/src/main/java/de/kuschku/libquassel/session/Session.kt +++ b/lib/src/main/java/de/kuschku/libquassel/session/Session.kt @@ -6,10 +6,13 @@ import de.kuschku.libquassel.protocol.message.SignalProxyMessage import de.kuschku.libquassel.quassel.QuasselFeature import de.kuschku.libquassel.quassel.syncables.* import de.kuschku.libquassel.quassel.syncables.interfaces.invokers.Invokers +import de.kuschku.libquassel.util.HandlerService import de.kuschku.libquassel.util.LoggingHandler.LogLevel.DEBUG import de.kuschku.libquassel.util.LoggingHandler.LogLevel.INFO import de.kuschku.libquassel.util.hasFlag import de.kuschku.libquassel.util.log +import io.reactivex.BackpressureStrategy +import io.reactivex.Flowable import io.reactivex.subjects.BehaviorSubject import org.threeten.bp.Instant import javax.net.ssl.X509TrustManager @@ -35,7 +38,9 @@ class Session( private var networks = mutableMapOf<NetworkId, Network>() private var networkConfig: NetworkConfig? = null - val connection = BehaviorSubject.createDefault(ICoreConnection.NULL) + private val connection = BehaviorSubject.createDefault(ICoreConnection.NULL) + val connectionPublisher: Flowable<ICoreConnection> = connection.toFlowable( + BackpressureStrategy.LATEST) init { log(INFO, "Session", "Session created") @@ -44,6 +49,12 @@ class Session( Invokers } + fun connect(address: SocketAddress, handlerService: HandlerService) { + val coreConnection = CoreConnection(this, address, handlerService) + connection.onNext(coreConnection) + coreConnection.start() + } + override fun handle(f: HandshakeMessage.ClientInitAck): Boolean { coreFeatures = f.coreFeatures ?: Quassel_Feature.NONE dispatch(HandshakeMessage.ClientLogin(