From b6c874c192f56b5bdc839ac67cde1d82a7a16304 Mon Sep 17 00:00:00 2001 From: Janne Mareike Koschinski <janne@kuschku.de> Date: Sun, 6 Jun 2021 16:18:08 +0200 Subject: [PATCH] Fixes in several places --- .../client/session/ClientConnectionHandler.kt | 4 +- .../client/session/ClientHandshakeHandler.kt | 4 +- .../session/ClientProxyMessageHandler.kt | 47 ++++--- .../client/session/ClientSession.kt | 6 +- .../client/session/ClientSessionState.kt | 4 +- .../client/syncables/ClientBacklogManager.kt | 121 +++++++++--------- .../libquassel/client/util/CoroutineQueue.kt | 10 +- .../test/resources/simplelogger.properties | 10 ++ gradle.properties | 2 +- .../protocol/syncables/ObjectRepository.kt | 2 +- 10 files changed, 114 insertions(+), 96 deletions(-) create mode 100644 client/src/test/resources/simplelogger.properties diff --git a/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientConnectionHandler.kt b/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientConnectionHandler.kt index a8fc8c5..63c798b 100644 --- a/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientConnectionHandler.kt +++ b/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientConnectionHandler.kt @@ -17,10 +17,10 @@ import de.justjanne.libquassel.protocol.session.MessageChannel abstract class ClientConnectionHandler : ConnectionHandler { protected var channel: MessageChannel? = null - private val readyQueue = CoroutineQueue() + private val readyQueue = CoroutineQueue<Unit>() override suspend fun init(channel: MessageChannel): Boolean { this.channel = channel - readyQueue.resume() + readyQueue.resume(Unit) return false } diff --git a/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientHandshakeHandler.kt b/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientHandshakeHandler.kt index 511a4f7..b0077fe 100644 --- a/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientHandshakeHandler.kt +++ b/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientHandshakeHandler.kt @@ -63,7 +63,8 @@ class ClientHandshakeHandler( ) { is HandshakeMessage.ClientInitReject -> throw HandshakeException.InitException(response.errorString ?: "Unknown Error") - is HandshakeMessage.ClientInitAck -> + is HandshakeMessage.ClientInitAck -> { + channel!!.negotiatedFeatures = response.featureSet if (response.coreConfigured == null) { throw HandshakeException.InitException("Unknown Error") } else if (response.coreConfigured == true) { @@ -74,6 +75,7 @@ class ClientHandshakeHandler( response.authenticatorInfo ) } + } else -> throw HandshakeException.InitException("Unknown Error") } } diff --git a/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientProxyMessageHandler.kt b/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientProxyMessageHandler.kt index c444b84..424f823 100644 --- a/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientProxyMessageHandler.kt +++ b/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientProxyMessageHandler.kt @@ -20,6 +20,7 @@ import de.justjanne.libquassel.protocol.syncables.common.RpcHandler import de.justjanne.libquassel.protocol.syncables.invoker.Invokers import de.justjanne.libquassel.protocol.util.log.trace import org.slf4j.LoggerFactory +import java.lang.Exception import java.nio.ByteBuffer class ClientProxyMessageHandler( @@ -35,28 +36,32 @@ class ClientProxyMessageHandler( override suspend fun dispatch(message: SignalProxyMessage) { logger.trace { "Read signal proxy message $message" } - when (message) { - is SignalProxyMessage.HeartBeat -> emit(SignalProxyMessage.HeartBeatReply(message.timestamp)) - is SignalProxyMessage.HeartBeatReply -> heartBeatHandler.recomputeLatency(message.timestamp, force = true) - is SignalProxyMessage.InitData -> objectRepository.init( - objectRepository.find(message.className, message.objectName) ?: return, - message.initData - ) - is SignalProxyMessage.InitRequest -> { - // Ignore incoming requests, we’re a client, we shouldn’t ever receive these - } - is SignalProxyMessage.Rpc -> { - val invoker = Invokers.get(ProtocolSide.CLIENT, "RpcHandler") - ?: throw RpcInvocationFailedException.InvokerNotFoundException("RpcHandler") - invoker.invoke(rpcHandler, message.slotName, message.params) - } - is SignalProxyMessage.Sync -> { - val invoker = Invokers.get(ProtocolSide.CLIENT, message.className) - ?: throw RpcInvocationFailedException.InvokerNotFoundException(message.className) - val syncable = objectRepository.find(message.className, message.objectName) - ?: throw RpcInvocationFailedException.SyncableNotFoundException(message.className, message.objectName) - invoker.invoke(syncable, message.slotName, message.params) + try { + when (message) { + is SignalProxyMessage.HeartBeat -> emit(SignalProxyMessage.HeartBeatReply(message.timestamp)) + is SignalProxyMessage.HeartBeatReply -> heartBeatHandler.recomputeLatency(message.timestamp, force = true) + is SignalProxyMessage.InitData -> objectRepository.init( + objectRepository.find(message.className, message.objectName) ?: return, + message.initData + ) + is SignalProxyMessage.InitRequest -> { + // Ignore incoming requests, we’re a client, we shouldn’t ever receive these + } + is SignalProxyMessage.Rpc -> { + val invoker = Invokers.get(ProtocolSide.CLIENT, "RpcHandler") + ?: throw RpcInvocationFailedException.InvokerNotFoundException("RpcHandler") + invoker.invoke(rpcHandler, message.slotName, message.params) + } + is SignalProxyMessage.Sync -> { + val invoker = Invokers.get(ProtocolSide.CLIENT, message.className) + ?: throw RpcInvocationFailedException.InvokerNotFoundException(message.className) + val syncable = objectRepository.find(message.className, message.objectName) + ?: throw RpcInvocationFailedException.SyncableNotFoundException(message.className, message.objectName) + invoker.invoke(syncable, message.slotName, message.params) + } } + } catch (e: Exception) { + println(e) } } diff --git a/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientSession.kt b/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientSession.kt index 0d6d847..03e6584 100644 --- a/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientSession.kt +++ b/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientSession.kt @@ -9,6 +9,7 @@ package de.justjanne.libquassel.client.session import de.justjanne.libquassel.annotations.ProtocolSide +import de.justjanne.libquassel.client.syncables.ClientBacklogManager import de.justjanne.libquassel.protocol.connection.ProtocolFeatures import de.justjanne.libquassel.protocol.connection.ProtocolMeta import de.justjanne.libquassel.protocol.io.CoroutineChannel @@ -23,7 +24,6 @@ import de.justjanne.libquassel.protocol.session.Session import de.justjanne.libquassel.protocol.syncables.HeartBeatHandler import de.justjanne.libquassel.protocol.syncables.ObjectRepository import de.justjanne.libquassel.protocol.syncables.common.AliasManager -import de.justjanne.libquassel.protocol.syncables.common.BacklogManager import de.justjanne.libquassel.protocol.syncables.common.BufferSyncer import de.justjanne.libquassel.protocol.syncables.common.BufferViewManager import de.justjanne.libquassel.protocol.syncables.common.CoreInfo @@ -83,6 +83,8 @@ class ClientSession( logger.info { "Client session initialized: networks = $networkIds, buffers = $bufferInfos, identities = $identities" } + objectRepository.add(state().coreInfo) + objectRepository.add(state().backlogManager) } override fun network(id: NetworkId) = state().networks[id] @@ -161,7 +163,7 @@ class ClientSession( networks = mapOf(), identities = mapOf(), aliasManager = AliasManager(this), - backlogManager = BacklogManager(this), + backlogManager = ClientBacklogManager(this), bufferSyncer = BufferSyncer(this), bufferViewManager = BufferViewManager(this), highlightRuleManager = HighlightRuleManager(this), diff --git a/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientSessionState.kt b/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientSessionState.kt index 9d29f8f..d03123d 100644 --- a/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientSessionState.kt +++ b/client/src/main/kotlin/de/justjanne/libquassel/client/session/ClientSessionState.kt @@ -9,10 +9,10 @@ package de.justjanne.libquassel.client.session +import de.justjanne.libquassel.client.syncables.ClientBacklogManager import de.justjanne.libquassel.protocol.models.ids.IdentityId import de.justjanne.libquassel.protocol.models.ids.NetworkId import de.justjanne.libquassel.protocol.syncables.common.AliasManager -import de.justjanne.libquassel.protocol.syncables.common.BacklogManager import de.justjanne.libquassel.protocol.syncables.common.BufferSyncer import de.justjanne.libquassel.protocol.syncables.common.BufferViewManager import de.justjanne.libquassel.protocol.syncables.common.CoreInfo @@ -28,7 +28,7 @@ data class ClientSessionState( val networks: Map<NetworkId, Network>, val identities: Map<IdentityId, Identity>, val aliasManager: AliasManager, - val backlogManager: BacklogManager, + val backlogManager: ClientBacklogManager, val bufferSyncer: BufferSyncer, val bufferViewManager: BufferViewManager, val ignoreListManager: IgnoreListManager, diff --git a/client/src/main/kotlin/de/justjanne/libquassel/client/syncables/ClientBacklogManager.kt b/client/src/main/kotlin/de/justjanne/libquassel/client/syncables/ClientBacklogManager.kt index b989199..633c45c 100644 --- a/client/src/main/kotlin/de/justjanne/libquassel/client/syncables/ClientBacklogManager.kt +++ b/client/src/main/kotlin/de/justjanne/libquassel/client/syncables/ClientBacklogManager.kt @@ -10,6 +10,8 @@ package de.justjanne.libquassel.client.syncables import de.justjanne.bitflags.of +import de.justjanne.bitflags.toBits +import de.justjanne.libquassel.client.util.CoroutineKeyedQueue import de.justjanne.libquassel.protocol.models.flags.MessageFlag import de.justjanne.libquassel.protocol.models.flags.MessageFlags import de.justjanne.libquassel.protocol.models.flags.MessageType @@ -19,21 +21,14 @@ import de.justjanne.libquassel.protocol.models.ids.MsgId import de.justjanne.libquassel.protocol.session.Session import de.justjanne.libquassel.protocol.syncables.common.BacklogManager import de.justjanne.libquassel.protocol.variant.QVariantList -import kotlin.coroutines.Continuation -import kotlin.coroutines.resume -import kotlin.coroutines.suspendCoroutine class ClientBacklogManager( session: Session ) : BacklogManager(session) { - private val bufferListeners = - mutableMapOf<BacklogData.Buffer, Continuation<BacklogData.Buffer>>() - private val bufferFilteredListeners = - mutableMapOf<BacklogData.BufferFiltered, Continuation<BacklogData.BufferFiltered>>() - private val allListeners = - mutableMapOf<BacklogData.All, Continuation<BacklogData.All>>() - private val allFilteredListeners = - mutableMapOf<BacklogData.AllFiltered, Continuation<BacklogData.AllFiltered>>() + private val bufferQueue = CoroutineKeyedQueue<BacklogData.Buffer, QVariantList>() + private val bufferFilteredQueue = CoroutineKeyedQueue<BacklogData.BufferFiltered, QVariantList>() + private val allQueue = CoroutineKeyedQueue<BacklogData.All, QVariantList>() + private val allFilteredQueue = CoroutineKeyedQueue<BacklogData.AllFiltered, QVariantList>() suspend fun backlog( bufferId: BufferId, @@ -41,9 +36,9 @@ class ClientBacklogManager( last: MsgId = MsgId(-1), limit: Int = -1, additional: Int = 0 - ) = suspendCoroutine<BacklogData.Buffer> { - val data = BacklogData.Buffer(bufferId, first, last, limit, additional) - bufferListeners[data] = it + ): QVariantList { + requestBacklog(bufferId, first, last, limit, additional) + return bufferQueue.wait(BacklogData.Buffer(bufferId, first, last, limit, additional)) } suspend fun backlogFiltered( @@ -54,9 +49,9 @@ class ClientBacklogManager( additional: Int = 0, type: MessageTypes = MessageType.all, flags: MessageFlags = MessageFlag.all - ) = suspendCoroutine<BacklogData.BufferFiltered> { - val data = BacklogData.BufferFiltered(bufferId, first, last, limit, additional, type, flags) - bufferFilteredListeners[data] = it + ): QVariantList { + requestBacklogFiltered(bufferId, first, last, limit, additional, type.toBits().toInt(), flags.toBits().toInt()) + return bufferFilteredQueue.wait(BacklogData.BufferFiltered(bufferId, first, last, limit, additional, type, flags)) } suspend fun backlogAll( @@ -64,9 +59,9 @@ class ClientBacklogManager( last: MsgId = MsgId(-1), limit: Int = -1, additional: Int = 0 - ) = suspendCoroutine<BacklogData.All> { - val data = BacklogData.All(first, last, limit, additional) - allListeners[data] = it + ): QVariantList { + requestBacklogAll(first, last, limit, additional) + return allQueue.wait(BacklogData.All(first, last, limit, additional)) } suspend fun backlogAllFiltered( @@ -76,9 +71,9 @@ class ClientBacklogManager( additional: Int = 0, type: MessageTypes = MessageType.all, flags: MessageFlags = MessageFlag.all - ) = suspendCoroutine<BacklogData.AllFiltered> { - val data = BacklogData.AllFiltered(first, last, limit, additional, type, flags) - allFilteredListeners[data] = it + ): QVariantList { + requestBacklogAllFiltered(first, last, limit, additional, type.toBits().toInt(), flags.toBits().toInt()) + return allFilteredQueue.wait(BacklogData.AllFiltered(first, last, limit, additional, type, flags)) } override fun receiveBacklog( @@ -89,14 +84,16 @@ class ClientBacklogManager( additional: Int, messages: QVariantList ) { - val data = BacklogData.Buffer( - bufferId, - first, - last, - limit, - additional + bufferQueue.resume( + BacklogData.Buffer( + bufferId, + first, + last, + limit, + additional + ), + messages ) - bufferListeners[data]?.resume(data.copy(messages = messages)) super.receiveBacklog(bufferId, first, last, limit, additional, messages) } @@ -110,27 +107,31 @@ class ClientBacklogManager( flags: Int, messages: QVariantList ) { - val data = BacklogData.BufferFiltered( - bufferId, - first, - last, - limit, - additional, - MessageType.of(type.toUInt()), - MessageFlag.of(flags.toUInt()) + bufferFilteredQueue.resume( + BacklogData.BufferFiltered( + bufferId, + first, + last, + limit, + additional, + MessageType.of(type.toUInt()), + MessageFlag.of(flags.toUInt()) + ), + messages ) - bufferFilteredListeners[data]?.resume(data.copy(messages = messages)) super.receiveBacklogFiltered(bufferId, first, last, limit, additional, type, flags, messages) } override fun receiveBacklogAll(first: MsgId, last: MsgId, limit: Int, additional: Int, messages: QVariantList) { - val data = BacklogData.All( - first, - last, - limit, - additional + allQueue.resume( + BacklogData.All( + first, + last, + limit, + additional + ), + messages ) - allListeners[data]?.resume(data.copy(messages = messages)) super.receiveBacklogAll(first, last, limit, additional, messages) } @@ -143,15 +144,17 @@ class ClientBacklogManager( flags: Int, messages: QVariantList ) { - val data = BacklogData.AllFiltered( - first, - last, - limit, - additional, - MessageType.of(type.toUInt()), - MessageFlag.of(flags.toUInt()), + allFilteredQueue.resume( + BacklogData.AllFiltered( + first, + last, + limit, + additional, + MessageType.of(type.toUInt()), + MessageFlag.of(flags.toUInt()), + ), + messages ) - allFilteredListeners[data]?.resume(data.copy(messages = messages)) super.receiveBacklogAllFiltered(first, last, limit, additional, type, flags, messages) } @@ -161,8 +164,7 @@ class ClientBacklogManager( val first: MsgId = MsgId(-1), val last: MsgId = MsgId(-1), val limit: Int = -1, - val additional: Int = 0, - val messages: QVariantList = emptyList() + val additional: Int = 0 ) : BacklogData() data class BufferFiltered( @@ -172,16 +174,14 @@ class ClientBacklogManager( val limit: Int = -1, val additional: Int = 0, val type: MessageTypes = MessageType.all, - val flags: MessageFlags = MessageFlag.all, - val messages: QVariantList = emptyList() + val flags: MessageFlags = MessageFlag.all ) : BacklogData() data class All( val first: MsgId = MsgId(-1), val last: MsgId = MsgId(-1), val limit: Int = -1, - val additional: Int = 0, - val messages: QVariantList = emptyList() + val additional: Int = 0 ) : BacklogData() data class AllFiltered( @@ -190,8 +190,7 @@ class ClientBacklogManager( val limit: Int = -1, val additional: Int = 0, val type: MessageTypes = MessageType.all, - val flags: MessageFlags = MessageFlag.all, - val messages: QVariantList = emptyList() + val flags: MessageFlags = MessageFlag.all ) : BacklogData() } } diff --git a/client/src/main/kotlin/de/justjanne/libquassel/client/util/CoroutineQueue.kt b/client/src/main/kotlin/de/justjanne/libquassel/client/util/CoroutineQueue.kt index c97b76d..335805d 100644 --- a/client/src/main/kotlin/de/justjanne/libquassel/client/util/CoroutineQueue.kt +++ b/client/src/main/kotlin/de/justjanne/libquassel/client/util/CoroutineQueue.kt @@ -13,15 +13,15 @@ import kotlin.coroutines.Continuation import kotlin.coroutines.resume import kotlin.coroutines.suspendCoroutine -class CoroutineQueue { - private val waiting = mutableListOf<Continuation<Unit>>() - suspend fun wait(): Unit = suspendCoroutine { +class CoroutineQueue<T> { + private val waiting = mutableListOf<Continuation<T>>() + suspend fun wait(): T = suspendCoroutine { waiting.add(it) } - suspend fun resume() { + suspend fun resume(value: T) { for (continuation in waiting) { - continuation.resume(Unit) + continuation.resume(value) } waiting.clear() } diff --git a/client/src/test/resources/simplelogger.properties b/client/src/test/resources/simplelogger.properties new file mode 100644 index 0000000..594f59e --- /dev/null +++ b/client/src/test/resources/simplelogger.properties @@ -0,0 +1,10 @@ +# +# libquassel +# Copyright (c) 2021 Janne Mareike Koschinski +# +# This Source Code Form is subject to the terms of the Mozilla Public License, +# v. 2.0. If a copy of the MPL was not distributed with this file, You can +# obtain one at https://mozilla.org/MPL/2.0/. +# + +org.slf4j.simpleLogger.defaultLogLevel=debug diff --git a/gradle.properties b/gradle.properties index b96a1c0..c905938 100644 --- a/gradle.properties +++ b/gradle.properties @@ -14,7 +14,7 @@ kotlinPoetVersion=1.8.0 kspVersion=1.5.10-1.0.0-beta01 GROUP=de.justjanne.libquassel -VERSION_NAME=0.5.0 +VERSION_NAME=0.5.1 POM_URL=https://git.kuschku.de/justJanne/libquassel POM_SCM_URL=https://git.kuschku.de/justJanne/libquassel diff --git a/protocol/src/main/kotlin/de/justjanne/libquassel/protocol/syncables/ObjectRepository.kt b/protocol/src/main/kotlin/de/justjanne/libquassel/protocol/syncables/ObjectRepository.kt index 38bc680..eeab61c 100644 --- a/protocol/src/main/kotlin/de/justjanne/libquassel/protocol/syncables/ObjectRepository.kt +++ b/protocol/src/main/kotlin/de/justjanne/libquassel/protocol/syncables/ObjectRepository.kt @@ -71,7 +71,7 @@ class ObjectRepository { } fun find(className: String, objectName: String): SyncableStub? { - return state().syncables[ObjectIdentifier(objectName, className)] + return state().syncables[ObjectIdentifier(className, objectName)] } inline fun <reified T : SyncableStub> find(objectName: String): T? { -- GitLab