Skip to content
Snippets Groups Projects
Verified Commit b6c874c1 authored by Janne Mareike Koschinski's avatar Janne Mareike Koschinski
Browse files

Fixes in several places

parent 6d259106
Branches
Tags
No related merge requests found
Showing
with 114 additions and 96 deletions
......@@ -17,10 +17,10 @@ import de.justjanne.libquassel.protocol.session.MessageChannel
abstract class ClientConnectionHandler : ConnectionHandler {
protected var channel: MessageChannel? = null
private val readyQueue = CoroutineQueue()
private val readyQueue = CoroutineQueue<Unit>()
override suspend fun init(channel: MessageChannel): Boolean {
this.channel = channel
readyQueue.resume()
readyQueue.resume(Unit)
return false
}
......
......@@ -63,7 +63,8 @@ class ClientHandshakeHandler(
) {
is HandshakeMessage.ClientInitReject ->
throw HandshakeException.InitException(response.errorString ?: "Unknown Error")
is HandshakeMessage.ClientInitAck ->
is HandshakeMessage.ClientInitAck -> {
channel!!.negotiatedFeatures = response.featureSet
if (response.coreConfigured == null) {
throw HandshakeException.InitException("Unknown Error")
} else if (response.coreConfigured == true) {
......@@ -74,6 +75,7 @@ class ClientHandshakeHandler(
response.authenticatorInfo
)
}
}
else -> throw HandshakeException.InitException("Unknown Error")
}
}
......
......@@ -20,6 +20,7 @@ import de.justjanne.libquassel.protocol.syncables.common.RpcHandler
import de.justjanne.libquassel.protocol.syncables.invoker.Invokers
import de.justjanne.libquassel.protocol.util.log.trace
import org.slf4j.LoggerFactory
import java.lang.Exception
import java.nio.ByteBuffer
class ClientProxyMessageHandler(
......@@ -35,6 +36,7 @@ class ClientProxyMessageHandler(
override suspend fun dispatch(message: SignalProxyMessage) {
logger.trace { "Read signal proxy message $message" }
try {
when (message) {
is SignalProxyMessage.HeartBeat -> emit(SignalProxyMessage.HeartBeatReply(message.timestamp))
is SignalProxyMessage.HeartBeatReply -> heartBeatHandler.recomputeLatency(message.timestamp, force = true)
......@@ -58,6 +60,9 @@ class ClientProxyMessageHandler(
invoker.invoke(syncable, message.slotName, message.params)
}
}
} catch (e: Exception) {
println(e)
}
}
companion object {
......
......@@ -9,6 +9,7 @@
package de.justjanne.libquassel.client.session
import de.justjanne.libquassel.annotations.ProtocolSide
import de.justjanne.libquassel.client.syncables.ClientBacklogManager
import de.justjanne.libquassel.protocol.connection.ProtocolFeatures
import de.justjanne.libquassel.protocol.connection.ProtocolMeta
import de.justjanne.libquassel.protocol.io.CoroutineChannel
......@@ -23,7 +24,6 @@ import de.justjanne.libquassel.protocol.session.Session
import de.justjanne.libquassel.protocol.syncables.HeartBeatHandler
import de.justjanne.libquassel.protocol.syncables.ObjectRepository
import de.justjanne.libquassel.protocol.syncables.common.AliasManager
import de.justjanne.libquassel.protocol.syncables.common.BacklogManager
import de.justjanne.libquassel.protocol.syncables.common.BufferSyncer
import de.justjanne.libquassel.protocol.syncables.common.BufferViewManager
import de.justjanne.libquassel.protocol.syncables.common.CoreInfo
......@@ -83,6 +83,8 @@ class ClientSession(
logger.info {
"Client session initialized: networks = $networkIds, buffers = $bufferInfos, identities = $identities"
}
objectRepository.add(state().coreInfo)
objectRepository.add(state().backlogManager)
}
override fun network(id: NetworkId) = state().networks[id]
......@@ -161,7 +163,7 @@ class ClientSession(
networks = mapOf(),
identities = mapOf(),
aliasManager = AliasManager(this),
backlogManager = BacklogManager(this),
backlogManager = ClientBacklogManager(this),
bufferSyncer = BufferSyncer(this),
bufferViewManager = BufferViewManager(this),
highlightRuleManager = HighlightRuleManager(this),
......
......@@ -9,10 +9,10 @@
package de.justjanne.libquassel.client.session
import de.justjanne.libquassel.client.syncables.ClientBacklogManager
import de.justjanne.libquassel.protocol.models.ids.IdentityId
import de.justjanne.libquassel.protocol.models.ids.NetworkId
import de.justjanne.libquassel.protocol.syncables.common.AliasManager
import de.justjanne.libquassel.protocol.syncables.common.BacklogManager
import de.justjanne.libquassel.protocol.syncables.common.BufferSyncer
import de.justjanne.libquassel.protocol.syncables.common.BufferViewManager
import de.justjanne.libquassel.protocol.syncables.common.CoreInfo
......@@ -28,7 +28,7 @@ data class ClientSessionState(
val networks: Map<NetworkId, Network>,
val identities: Map<IdentityId, Identity>,
val aliasManager: AliasManager,
val backlogManager: BacklogManager,
val backlogManager: ClientBacklogManager,
val bufferSyncer: BufferSyncer,
val bufferViewManager: BufferViewManager,
val ignoreListManager: IgnoreListManager,
......
......@@ -10,6 +10,8 @@
package de.justjanne.libquassel.client.syncables
import de.justjanne.bitflags.of
import de.justjanne.bitflags.toBits
import de.justjanne.libquassel.client.util.CoroutineKeyedQueue
import de.justjanne.libquassel.protocol.models.flags.MessageFlag
import de.justjanne.libquassel.protocol.models.flags.MessageFlags
import de.justjanne.libquassel.protocol.models.flags.MessageType
......@@ -19,21 +21,14 @@ import de.justjanne.libquassel.protocol.models.ids.MsgId
import de.justjanne.libquassel.protocol.session.Session
import de.justjanne.libquassel.protocol.syncables.common.BacklogManager
import de.justjanne.libquassel.protocol.variant.QVariantList
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
class ClientBacklogManager(
session: Session
) : BacklogManager(session) {
private val bufferListeners =
mutableMapOf<BacklogData.Buffer, Continuation<BacklogData.Buffer>>()
private val bufferFilteredListeners =
mutableMapOf<BacklogData.BufferFiltered, Continuation<BacklogData.BufferFiltered>>()
private val allListeners =
mutableMapOf<BacklogData.All, Continuation<BacklogData.All>>()
private val allFilteredListeners =
mutableMapOf<BacklogData.AllFiltered, Continuation<BacklogData.AllFiltered>>()
private val bufferQueue = CoroutineKeyedQueue<BacklogData.Buffer, QVariantList>()
private val bufferFilteredQueue = CoroutineKeyedQueue<BacklogData.BufferFiltered, QVariantList>()
private val allQueue = CoroutineKeyedQueue<BacklogData.All, QVariantList>()
private val allFilteredQueue = CoroutineKeyedQueue<BacklogData.AllFiltered, QVariantList>()
suspend fun backlog(
bufferId: BufferId,
......@@ -41,9 +36,9 @@ class ClientBacklogManager(
last: MsgId = MsgId(-1),
limit: Int = -1,
additional: Int = 0
) = suspendCoroutine<BacklogData.Buffer> {
val data = BacklogData.Buffer(bufferId, first, last, limit, additional)
bufferListeners[data] = it
): QVariantList {
requestBacklog(bufferId, first, last, limit, additional)
return bufferQueue.wait(BacklogData.Buffer(bufferId, first, last, limit, additional))
}
suspend fun backlogFiltered(
......@@ -54,9 +49,9 @@ class ClientBacklogManager(
additional: Int = 0,
type: MessageTypes = MessageType.all,
flags: MessageFlags = MessageFlag.all
) = suspendCoroutine<BacklogData.BufferFiltered> {
val data = BacklogData.BufferFiltered(bufferId, first, last, limit, additional, type, flags)
bufferFilteredListeners[data] = it
): QVariantList {
requestBacklogFiltered(bufferId, first, last, limit, additional, type.toBits().toInt(), flags.toBits().toInt())
return bufferFilteredQueue.wait(BacklogData.BufferFiltered(bufferId, first, last, limit, additional, type, flags))
}
suspend fun backlogAll(
......@@ -64,9 +59,9 @@ class ClientBacklogManager(
last: MsgId = MsgId(-1),
limit: Int = -1,
additional: Int = 0
) = suspendCoroutine<BacklogData.All> {
val data = BacklogData.All(first, last, limit, additional)
allListeners[data] = it
): QVariantList {
requestBacklogAll(first, last, limit, additional)
return allQueue.wait(BacklogData.All(first, last, limit, additional))
}
suspend fun backlogAllFiltered(
......@@ -76,9 +71,9 @@ class ClientBacklogManager(
additional: Int = 0,
type: MessageTypes = MessageType.all,
flags: MessageFlags = MessageFlag.all
) = suspendCoroutine<BacklogData.AllFiltered> {
val data = BacklogData.AllFiltered(first, last, limit, additional, type, flags)
allFilteredListeners[data] = it
): QVariantList {
requestBacklogAllFiltered(first, last, limit, additional, type.toBits().toInt(), flags.toBits().toInt())
return allFilteredQueue.wait(BacklogData.AllFiltered(first, last, limit, additional, type, flags))
}
override fun receiveBacklog(
......@@ -89,14 +84,16 @@ class ClientBacklogManager(
additional: Int,
messages: QVariantList
) {
val data = BacklogData.Buffer(
bufferQueue.resume(
BacklogData.Buffer(
bufferId,
first,
last,
limit,
additional
),
messages
)
bufferListeners[data]?.resume(data.copy(messages = messages))
super.receiveBacklog(bufferId, first, last, limit, additional, messages)
}
......@@ -110,7 +107,8 @@ class ClientBacklogManager(
flags: Int,
messages: QVariantList
) {
val data = BacklogData.BufferFiltered(
bufferFilteredQueue.resume(
BacklogData.BufferFiltered(
bufferId,
first,
last,
......@@ -118,19 +116,22 @@ class ClientBacklogManager(
additional,
MessageType.of(type.toUInt()),
MessageFlag.of(flags.toUInt())
),
messages
)
bufferFilteredListeners[data]?.resume(data.copy(messages = messages))
super.receiveBacklogFiltered(bufferId, first, last, limit, additional, type, flags, messages)
}
override fun receiveBacklogAll(first: MsgId, last: MsgId, limit: Int, additional: Int, messages: QVariantList) {
val data = BacklogData.All(
allQueue.resume(
BacklogData.All(
first,
last,
limit,
additional
),
messages
)
allListeners[data]?.resume(data.copy(messages = messages))
super.receiveBacklogAll(first, last, limit, additional, messages)
}
......@@ -143,15 +144,17 @@ class ClientBacklogManager(
flags: Int,
messages: QVariantList
) {
val data = BacklogData.AllFiltered(
allFilteredQueue.resume(
BacklogData.AllFiltered(
first,
last,
limit,
additional,
MessageType.of(type.toUInt()),
MessageFlag.of(flags.toUInt()),
),
messages
)
allFilteredListeners[data]?.resume(data.copy(messages = messages))
super.receiveBacklogAllFiltered(first, last, limit, additional, type, flags, messages)
}
......@@ -161,8 +164,7 @@ class ClientBacklogManager(
val first: MsgId = MsgId(-1),
val last: MsgId = MsgId(-1),
val limit: Int = -1,
val additional: Int = 0,
val messages: QVariantList = emptyList()
val additional: Int = 0
) : BacklogData()
data class BufferFiltered(
......@@ -172,16 +174,14 @@ class ClientBacklogManager(
val limit: Int = -1,
val additional: Int = 0,
val type: MessageTypes = MessageType.all,
val flags: MessageFlags = MessageFlag.all,
val messages: QVariantList = emptyList()
val flags: MessageFlags = MessageFlag.all
) : BacklogData()
data class All(
val first: MsgId = MsgId(-1),
val last: MsgId = MsgId(-1),
val limit: Int = -1,
val additional: Int = 0,
val messages: QVariantList = emptyList()
val additional: Int = 0
) : BacklogData()
data class AllFiltered(
......@@ -190,8 +190,7 @@ class ClientBacklogManager(
val limit: Int = -1,
val additional: Int = 0,
val type: MessageTypes = MessageType.all,
val flags: MessageFlags = MessageFlag.all,
val messages: QVariantList = emptyList()
val flags: MessageFlags = MessageFlag.all
) : BacklogData()
}
}
......@@ -13,15 +13,15 @@ import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
class CoroutineQueue {
private val waiting = mutableListOf<Continuation<Unit>>()
suspend fun wait(): Unit = suspendCoroutine {
class CoroutineQueue<T> {
private val waiting = mutableListOf<Continuation<T>>()
suspend fun wait(): T = suspendCoroutine {
waiting.add(it)
}
suspend fun resume() {
suspend fun resume(value: T) {
for (continuation in waiting) {
continuation.resume(Unit)
continuation.resume(value)
}
waiting.clear()
}
......
#
# libquassel
# Copyright (c) 2021 Janne Mareike Koschinski
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.
#
org.slf4j.simpleLogger.defaultLogLevel=debug
......@@ -14,7 +14,7 @@ kotlinPoetVersion=1.8.0
kspVersion=1.5.10-1.0.0-beta01
GROUP=de.justjanne.libquassel
VERSION_NAME=0.5.0
VERSION_NAME=0.5.1
POM_URL=https://git.kuschku.de/justJanne/libquassel
POM_SCM_URL=https://git.kuschku.de/justJanne/libquassel
......
......@@ -71,7 +71,7 @@ class ObjectRepository {
}
fun find(className: String, objectName: String): SyncableStub? {
return state().syncables[ObjectIdentifier(objectName, className)]
return state().syncables[ObjectIdentifier(className, objectName)]
}
inline fun <reified T : SyncableStub> find(objectName: String): T? {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment