BufferingObservableTest.kt
The snippet can be accessed without any authentication.
Authored by
Janne Mareike Koschinski
Edited
/*
* Quasseldroid - Quassel client for Android
*
* Copyright (c) 2019 Janne Koschinski
* Copyright (c) 2019 The Quassel Project
*
* This program is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 3 as published
* by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package de.kuschku.libquassel.util.rxjava
import org.junit.Assert.assertEquals
import org.junit.Test
class ReusableUnicastSubjectTest {
@Test
fun test() {
// We have Object A and B1, B2, etc.
//
// Object A should provide an observable to Objects B1, B2, etc.
// Object A will at some points publish items to this Observable.
// As long as no subscriber is subscribed, these items should be buffered.
//
// As soon as a subscriber subscribers, it should get all buffered items, as well as all that
// come after until it unsubscribes.
//
// If the subscriber unsubscribes again, the observable should buffer incoming items again,
// until another subscriber subscribes again
val expected1 = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val received1 = mutableListOf<Int>()
val expected2 = listOf(11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
val received2 = mutableListOf<Int>()
val expected3 = listOf(21, 22, 23, 24, 25, 26, 27, 28, 29, 30)
val received3 = mutableListOf<Int>()
// We create our observable, this is supposed to be in Object A
val subject = ReusableUnicastSubject.create<Int>()
val observable = subject.publish().refCount()
// And emit items while no subscriber is subscribed.
// These should get buffered
for (i in 1..5) {
subject.onNext(i)
}
// B1 subscribes, the subscriber should now receive all buffered items
val subscription1 = observable.subscribe {
received1.add(it)
}
// We emit items while a subscriber is subscribed,
// these shouldn’t get buffered but instead directly consumed by the subscriber
for (i in 6..10) {
subject.onNext(i)
}
// B1 unsubscribes again, from now on items should get buffered again
subscription1.dispose()
// These items should get buffered again
for (i in 11..15) {
subject.onNext(i)
}
// As soon as B2 subscribes, it should receive the buffered items 11..15
val subscription2 = observable.subscribe {
received2.add(it)
}
// These items should get directly consumed by the subscriber again
for (i in 16..20) {
subject.onNext(i)
}
// B3 should receive no buffered items
val subscription3 = observable.subscribe {
received3.add(it)
}
for (i in 21..25) {
subject.onNext(i)
}
// And B2 unsubscribes again
subscription2.dispose()
// These items should get directly consumed by the B3
for (i in 26..30) {
subject.onNext(i)
}
subscription3.dispose()
assertEquals(expected1, received1)
assertEquals(expected2, received2)
assertEquals(expected3, received3)
}
}
Please register or sign in to comment