Skip to content
Snippets Groups Projects
Unverified Commit 77e8b4bf authored by Janne Mareike Koschinski's avatar Janne Mareike Koschinski
Browse files

wip

parent 2a94ce7f
Branches
No related tags found
No related merge requests found
Pipeline #3044 canceled
Showing
with 331 additions and 279 deletions
/*
* libquassel
* Copyright (c) 2025 Janne Mareike Koschinski
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/.
*/
package de.justjanne.libquassel.connection
import de.justjanne.libquassel.protocol.io.ChainedByteBuffer
import de.justjanne.libquassel.protocol.io.CoroutineChannel
import java.nio.ByteBuffer
import javax.net.ssl.SSLContext
class ChannelConnection(
private val channel: CoroutineChannel,
): Connection {
private val sizeBuffer = ByteBuffer.allocateDirect(4)
private val buffer = ChainedByteBuffer()
override suspend fun enableTLS(sslContext: SSLContext) = channel.enableTLS(sslContext)
override suspend fun enableCompression() = channel.enableCompression()
override suspend fun send(sizePrefix: Boolean, handler: (buffer: ChainedByteBuffer) -> Unit) {
handler(buffer)
if (sizePrefix) {
sizeBuffer.clear()
sizeBuffer.putInt(buffer.size)
sizeBuffer.flip()
channel.write(sizeBuffer)
sizeBuffer.clear()
}
channel.write(buffer)
channel.flush()
buffer.clear()
}
override suspend fun <T> read(buffer: ByteBuffer, handler: (buffer: ByteBuffer) -> T): T {
channel.read(buffer)
buffer.flip()
return handler(buffer)
}
override suspend fun <T> read(handler: (buffer: ByteBuffer) -> T): T {
sizeBuffer.clear()
channel.read(sizeBuffer)
sizeBuffer.flip()
val content = ByteBuffer.allocateDirect(sizeBuffer.getInt())
channel.read(content)
content.flip()
return handler(content)
}
}
/*
* libquassel
* Copyright (c) 2025 Janne Mareike Koschinski
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/.
*/
package de.justjanne.libquassel.connection
import de.justjanne.libquassel.protocol.exceptions.HandshakeException
import de.justjanne.libquassel.protocol.features.FeatureSet
import de.justjanne.libquassel.protocol.models.HandshakeMessage
import de.justjanne.libquassel.protocol.serializers.HandshakeMessageSerializer
class ClientHandshakeHandler(
private val clientInit: HandshakeMessage.ClientInit,
private val clientLogin: HandshakeMessage.ClientLogin,
) : ConnectionHandler<HandshakeState> {
private suspend fun Connection.request(message: HandshakeMessage): HandshakeMessage {
send(sizePrefix = true) {
HandshakeMessageSerializer.serialize(it, message, FeatureSet.none())
}
return read { HandshakeMessageSerializer.deserialize(it, FeatureSet.none()) }
}
private suspend fun Connection.clientInit(message: HandshakeMessage.ClientInit): Result<HandshakeMessage.ClientInitAck> =
when (val result = request(message)) {
is HandshakeMessage.ClientInitAck -> Result.success(result)
is HandshakeMessage.ClientInitReject -> Result.failure(HandshakeException.InitException(result.errorString ?: "Unknown error"))
else -> Result.failure(HandshakeException.InitException("Unknown error"))
}
private suspend fun Connection.clientLogin(message: HandshakeMessage.ClientLogin): Result<HandshakeMessage.ClientLoginAck> =
when (val result = request(message)) {
is HandshakeMessage.ClientLoginAck -> Result.success(result)
is HandshakeMessage.ClientLoginReject -> Result.failure(HandshakeException.InitException(result.errorString ?: "Unknown error"))
else -> Result.failure(HandshakeException.InitException("Unknown error"))
}
private suspend fun Connection.coreSetup(message: HandshakeMessage.CoreSetupData): Result<HandshakeMessage.CoreSetupAck> =
when (val result = request(message)) {
is HandshakeMessage.CoreSetupAck -> Result.success(result)
is HandshakeMessage.CoreSetupReject -> Result.failure(HandshakeException.InitException(result.errorString ?: "Unknown error"))
else -> Result.failure(HandshakeException.InitException("Unknown error"))
}
override suspend fun handle(connection: Connection) = runCatching {
val init = connection.clientInit(clientInit).getOrThrow()
connection.clientLogin(clientLogin).getOrThrow()
val sessionInit = when (val result = connection.read { HandshakeMessageSerializer.deserialize(it, FeatureSet.none()) }) {
is HandshakeMessage.SessionInit -> Result.success(result)
else -> Result.failure(HandshakeException.InitException("Unknown error"))
}.getOrThrow()
HandshakeState(init, sessionInit)
}
}
/*
* libquassel
* Copyright (c) 2025 Janne Mareike Koschinski
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/.
*/
package de.justjanne.libquassel.connection
import de.justjanne.libquassel.protocol.api.ObjectName
import de.justjanne.libquassel.protocol.api.dispatcher.RpcDispatcher
import de.justjanne.libquassel.protocol.api.dispatcher.SyncHandler
import de.justjanne.libquassel.protocol.models.SignalProxyMessage
import de.justjanne.libquassel.protocol.models.types.QtType
import de.justjanne.libquassel.protocol.serializers.SignalProxyMessageSerializer
import de.justjanne.libquassel.protocol.variant.qVariant
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.update
class ClientSessionHandler(
private val handshake: HandshakeState,
private val sync: SyncHandler,
private val rpc: RpcDispatcher,
) : ConnectionHandler<Unit> {
val toInit = MutableStateFlow<List<Pair<String, String>>?>(null)
override suspend fun handle(connection: Connection) = runCatching {
suspend fun Connection.send(message: SignalProxyMessage) = send(true) {
SignalProxyMessageSerializer.serialize(it, message, handshake.clientInitAck.featureSet)
}
toInit.value = listOf(
Pair("AliasManager", "")
)
connection.send(SignalProxyMessage.InitRequest("AliasManager", ""))
while (true) {
when (val message = connection.read { SignalProxyMessageSerializer.deserialize(it, handshake.clientInitAck.featureSet) }) {
is SignalProxyMessage.HeartBeat -> connection.send(SignalProxyMessage.HeartBeatReply(message.timestamp))
is SignalProxyMessage.HeartBeatReply -> Unit
is SignalProxyMessage.InitData -> {
sync.invoke(message.className, ObjectName(message.objectName), "update", listOf(qVariant(message.initData, QtType.QVariantMap)))
toInit.update { it?.minus(Pair(message.className, message.objectName)) }
}
is SignalProxyMessage.InitRequest -> Unit
is SignalProxyMessage.Rpc -> rpc.invoke(message.slotName, message.params)
is SignalProxyMessage.Sync -> sync.invoke(message.className, ObjectName(message.objectName), message.slotName, message.params)
}
}
}
}
/*
* libquassel
* Copyright (c) 2025 Janne Mareike Koschinski
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/.
*/
package de.justjanne.libquassel.connection
import de.justjanne.libquassel.protocol.io.ChainedByteBuffer
import java.nio.ByteBuffer
import javax.net.ssl.SSLContext
interface Connection {
suspend fun enableTLS(sslContext: SSLContext)
suspend fun enableCompression()
suspend fun send(sizePrefix: Boolean, handler: (buffer: ChainedByteBuffer) -> Unit)
suspend fun <T> read(buffer: ByteBuffer, handler: (buffer: ByteBuffer) -> T): T
suspend fun <T> read(handler: (buffer: ByteBuffer) -> T): T
}
/* /*
* libquassel * libquassel
* Copyright (c) 2021 Janne Mareike Koschinski * Copyright (c) 2025 Janne Mareike Koschinski
* *
* This Source Code Form is subject to the terms of the Mozilla Public License, * This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can * v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/. * obtain one at https://mozilla.org/MPL/2.0/.
*/ */
package de.justjanne.libquassel.protocol.session package de.justjanne.libquassel.connection
import java.nio.ByteBuffer interface ConnectionHandler<T> {
suspend fun handle(connection: Connection): Result<T>
interface ConnectionHandler {
suspend fun init(channel: MessageChannel): Boolean
suspend fun done()
suspend fun read(buffer: ByteBuffer): Boolean
} }
/* /*
* libquassel * libquassel
* Copyright (c) 2021 Janne Mareike Koschinski * Copyright (c) 2025 Janne Mareike Koschinski
* *
* This Source Code Form is subject to the terms of the Mozilla Public License, * This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can * v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/. * obtain one at https://mozilla.org/MPL/2.0/.
*/ */
package de.justjanne.libquassel.protocol.session package de.justjanne.libquassel.connection
import de.justjanne.libquassel.protocol.models.SignalProxyMessage import de.justjanne.libquassel.protocol.connection.CoreHeader
interface ProxyMessageHandler : ConnectionHandler { data class ConnectionSetupState(
suspend fun emit(message: SignalProxyMessage) val coreHeader: CoreHeader,
)
suspend fun dispatch(message: SignalProxyMessage)
}
/* /*
* libquassel * libquassel
* Copyright (c) 2021 Janne Mareike Koschinski * Copyright (c) 2025 Janne Mareike Koschinski
* *
* This Source Code Form is subject to the terms of the Mozilla Public License, * This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can * v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/. * obtain one at https://mozilla.org/MPL/2.0/.
*/ */
package de.justjanne.libquassel.protocol.session package de.justjanne.libquassel.connection
import de.justjanne.libquassel.protocol.models.setup.BackendInfo import de.justjanne.libquassel.protocol.models.HandshakeMessage
sealed class CoreState { data class HandshakeState(
object Configured : CoreState() val clientInitAck: HandshakeMessage.ClientInitAck,
val sessionInit: HandshakeMessage.SessionInit,
data class Unconfigured( )
val databases: List<BackendInfo>,
val authenticators: List<BackendInfo>,
) : CoreState()
}
/*
* libquassel
* Copyright (c) 2025 Janne Mareike Koschinski
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/.
*/
package de.justjanne.libquassel.connection
import de.justjanne.libquassel.protocol.connection.ClientHeader
import de.justjanne.libquassel.protocol.connection.ClientHeaderSerializer
import de.justjanne.libquassel.protocol.connection.CoreHeader
import de.justjanne.libquassel.protocol.connection.CoreHeaderSerializer
import de.justjanne.libquassel.protocol.connection.ProtocolFeature
import de.justjanne.libquassel.protocol.features.FeatureSet
import java.nio.ByteBuffer
import javax.net.ssl.SSLContext
class MagicHandler(
private val clientHeader: ClientHeader,
) : ConnectionHandler<ConnectionSetupState> {
override suspend fun handle(connection: Connection) = runCatching {
connection.send(sizePrefix = false) {
ClientHeaderSerializer.serialize(it, clientHeader, FeatureSet.none())
}
val protocol: CoreHeader = connection.read(ByteBuffer.allocateDirect(4)) {
CoreHeaderSerializer.deserialize(it, FeatureSet.none())
}
if (protocol.features.contains(ProtocolFeature.TLS)) {
connection.enableTLS(SSLContext.getDefault())
}
if (protocol.features.contains(ProtocolFeature.Compression)) {
connection.enableCompression()
}
ConnectionSetupState(protocol)
}
}
...@@ -15,6 +15,7 @@ import dagger.Binds ...@@ -15,6 +15,7 @@ import dagger.Binds
import dagger.Component import dagger.Component
import dagger.Module import dagger.Module
import dagger.Provides import dagger.Provides
import de.justjanne.bitflags.of
import de.justjanne.libquassel.backend.AliasManagerPersister import de.justjanne.libquassel.backend.AliasManagerPersister
import de.justjanne.libquassel.backend.BacklogManagerPersister import de.justjanne.libquassel.backend.BacklogManagerPersister
import de.justjanne.libquassel.backend.BufferSyncerPersister import de.justjanne.libquassel.backend.BufferSyncerPersister
...@@ -31,8 +32,11 @@ import de.justjanne.libquassel.backend.IrcUserPersister ...@@ -31,8 +32,11 @@ import de.justjanne.libquassel.backend.IrcUserPersister
import de.justjanne.libquassel.backend.NetworkConfigPersister import de.justjanne.libquassel.backend.NetworkConfigPersister
import de.justjanne.libquassel.backend.NetworkPersister import de.justjanne.libquassel.backend.NetworkPersister
import de.justjanne.libquassel.backend.RpcPersister import de.justjanne.libquassel.backend.RpcPersister
import de.justjanne.libquassel.connection.ChannelConnection
import de.justjanne.libquassel.connection.ClientHandshakeHandler
import de.justjanne.libquassel.connection.ClientSessionHandler
import de.justjanne.libquassel.connection.MagicHandler
import de.justjanne.libquassel.persistence.AliasEntity import de.justjanne.libquassel.persistence.AliasEntity
import de.justjanne.libquassel.persistence.AliasRepository
import de.justjanne.libquassel.persistence.AppDatabase import de.justjanne.libquassel.persistence.AppDatabase
import de.justjanne.libquassel.protocol.api.ObjectName import de.justjanne.libquassel.protocol.api.ObjectName
import de.justjanne.libquassel.protocol.api.client.AliasManagerClientApi import de.justjanne.libquassel.protocol.api.client.AliasManagerClientApi
...@@ -68,7 +72,14 @@ import de.justjanne.libquassel.protocol.api.server.IgnoreListManagerServerApi ...@@ -68,7 +72,14 @@ import de.justjanne.libquassel.protocol.api.server.IgnoreListManagerServerApi
import de.justjanne.libquassel.protocol.api.server.IrcListHelperServerApi import de.justjanne.libquassel.protocol.api.server.IrcListHelperServerApi
import de.justjanne.libquassel.protocol.api.server.NetworkConfigServerApi import de.justjanne.libquassel.protocol.api.server.NetworkConfigServerApi
import de.justjanne.libquassel.protocol.api.server.NetworkServerApi import de.justjanne.libquassel.protocol.api.server.NetworkServerApi
import de.justjanne.libquassel.protocol.connection.ClientHeader
import de.justjanne.libquassel.protocol.connection.ProtocolFeature
import de.justjanne.libquassel.protocol.connection.ProtocolMeta
import de.justjanne.libquassel.protocol.connection.ProtocolVersion
import de.justjanne.libquassel.protocol.exceptions.RpcInvocationFailedException import de.justjanne.libquassel.protocol.exceptions.RpcInvocationFailedException
import de.justjanne.libquassel.protocol.features.FeatureSet
import de.justjanne.libquassel.protocol.io.CoroutineChannel
import de.justjanne.libquassel.protocol.models.HandshakeMessage
import de.justjanne.libquassel.protocol.models.ids.BufferId import de.justjanne.libquassel.protocol.models.ids.BufferId
import de.justjanne.libquassel.protocol.models.ids.MsgId import de.justjanne.libquassel.protocol.models.ids.MsgId
import de.justjanne.libquassel.protocol.models.types.QtType import de.justjanne.libquassel.protocol.models.types.QtType
...@@ -77,17 +88,24 @@ import de.justjanne.libquassel.protocol.variant.QVariant_ ...@@ -77,17 +88,24 @@ import de.justjanne.libquassel.protocol.variant.QVariant_
import de.justjanne.libquassel.protocol.variant.qVariant import de.justjanne.libquassel.protocol.variant.qVariant
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import java.net.InetSocketAddress
import java.nio.channels.ClosedByInterruptException
import javax.inject.Inject import javax.inject.Inject
import javax.inject.Singleton import javax.inject.Singleton
import kotlin.test.assertFails import kotlin.test.assertFails
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
@Singleton @Singleton
class ProxyImpl @Inject constructor() : Proxy { class ProxyImpl @Inject constructor() : Proxy {
...@@ -123,6 +141,7 @@ interface ClientModule { ...@@ -123,6 +141,7 @@ interface ClientModule {
@Module @Module
class DatabaseModule { class DatabaseModule {
@Singleton
@Provides @Provides
fun database(): AppDatabase = fun database(): AppDatabase =
Room.inMemoryDatabaseBuilder<AppDatabase>() Room.inMemoryDatabaseBuilder<AppDatabase>()
...@@ -191,7 +210,6 @@ class QuasselApiTest { ...@@ -191,7 +210,6 @@ class QuasselApiTest {
client.api().certManager.requestUpdate(ObjectName(""), emptyMap()) client.api().certManager.requestUpdate(ObjectName(""), emptyMap())
client.api().highlightRuleManager.requestRemoveHighlightRule(5) client.api().highlightRuleManager.requestRemoveHighlightRule(5)
client.api().identity.requestUpdate(ObjectName(""), emptyMap()) client.api().identity.requestUpdate(ObjectName(""), emptyMap())
println("hi!")
} }
@Test @Test
...@@ -223,4 +241,38 @@ class QuasselApiTest { ...@@ -223,4 +241,38 @@ class QuasselApiTest {
delay(10.milliseconds) delay(10.milliseconds)
job.cancelAndJoin() job.cancelAndJoin()
} }
@Test
fun testConnection() = runBlocking {
val di = DaggerClientComponent.builder().build()
val channel = CoroutineChannel()
withTimeout(500.milliseconds) {
channel.connect(InetSocketAddress("decentralised.chat", 4242), keepAlive = true)
}
val connection = ChannelConnection(channel)
val connectionSetup = withTimeout(500.milliseconds) {
MagicHandler(
ClientHeader(
features = ProtocolFeature.of(ProtocolFeature.Compression, ProtocolFeature.TLS),
versions = listOf(ProtocolMeta(ProtocolVersion.Datastream, 0u))
),
).handle(connection)
}.getOrThrow()
val handshake = withTimeout(500.milliseconds) {
ClientHandshakeHandler(
HandshakeMessage.ClientInit(clientVersion = "", buildDate = "", featureSet = FeatureSet.none()),
HandshakeMessage.ClientLogin(Const.user, Const.pass),
).handle(connection)
}.getOrThrow()
val sessionHandler = ClientSessionHandler(handshake, di.sync(), di.rpc())
withTimeout(2.seconds) {
launch {
sessionHandler.handle(connection)
}
sessionHandler.toInit.first { it != null && it.isEmpty() }
channel.close()
}
println(di.db().alias().getAll())
Unit
}
} }
/*
* libquassel
* Copyright (c) 2021 Janne Mareike Koschinski
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/.
*/
package de.justjanne.libquassel.protocol.session
import de.justjanne.libquassel.protocol.exceptions.HandshakeException
import de.justjanne.libquassel.protocol.features.FeatureSet
import de.justjanne.libquassel.protocol.variant.QVariantMap
interface HandshakeHandler : ConnectionHandler {
/**
* Register client and start connection
*/
@Throws(HandshakeException.InitException::class)
suspend fun init(
/**
* Human readable (HTML formatted) version of the client
*/
clientVersion: String,
/**
* Build timestamp of the client
*/
buildDate: String,
/**
* Enabled client features for this connection
*/
featureSet: FeatureSet,
): CoreState
/**
* Login to core with authentication data
*/
@Throws(HandshakeException.LoginException::class)
suspend fun login(
/**
* Username of the core account
*/
username: String,
/**
* Password of the core account
*/
password: String,
)
/**
* Configure core for the first time
*/
@Throws(HandshakeException.SetupException::class)
suspend fun configureCore(
/**
* Username of a new core account to be created
*/
adminUsername: String,
/**
* Password of a new core account to be created
*/
adminPassword: String,
/**
* Chosen storage backend id
*/
backend: String,
/**
* Storage backend configuration data
*/
backendConfiguration: QVariantMap,
/**
* Chosen authenticator backend id
*/
authenticator: String,
/**
* Authenticator backend configuration data
*/
authenticatorConfiguration: QVariantMap,
)
}
/*
* libquassel
* Copyright (c) 2021 Janne Mareike Koschinski
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/.
*/
package de.justjanne.libquassel.protocol.session
import de.justjanne.libquassel.protocol.features.FeatureSet
import de.justjanne.libquassel.protocol.io.ChainedByteBuffer
import de.justjanne.libquassel.protocol.io.CoroutineChannel
import de.justjanne.libquassel.protocol.models.HandshakeMessage
import de.justjanne.libquassel.protocol.models.SignalProxyMessage
import de.justjanne.libquassel.protocol.serializers.HandshakeMessageSerializer
import de.justjanne.libquassel.protocol.serializers.SignalProxyMessageSerializer
import de.justjanne.libquassel.protocol.util.log.trace
import kotlinx.coroutines.coroutineScope
import org.slf4j.LoggerFactory
import java.io.Closeable
import java.nio.ByteBuffer
class MessageChannel(
val channel: CoroutineChannel,
) : Closeable {
var negotiatedFeatures = FeatureSet.none()
private var handlers = mutableListOf<ConnectionHandler>()
fun register(handler: ConnectionHandler) {
handlers.add(handler)
}
private val sendBuffer = ThreadLocal.withInitial(::ChainedByteBuffer)
private val sizeBuffer = ThreadLocal.withInitial { ByteBuffer.allocateDirect(4) }
suspend fun init() {
setupHandlers()
}
private suspend fun readAmount(): Int {
val sizeBuffer = sizeBuffer.get()
sizeBuffer.clear()
channel.read(sizeBuffer)
sizeBuffer.flip()
val size = sizeBuffer.int
sizeBuffer.clear()
return size
}
suspend fun read() {
val amount = readAmount()
val messageBuffer = ByteBuffer.allocateDirect(minOf(amount, 65 * 1024 * 1024))
channel.read(messageBuffer)
messageBuffer.flip()
dispatch(messageBuffer)
}
private suspend fun setupHandlers(): List<ConnectionHandler> {
val removed = mutableListOf<ConnectionHandler>()
while (true) {
val handler = handlers.firstOrNull()
logger.trace { "Setting up handler $handler" }
if (handler?.init(this) != true) {
break
}
logger.trace { "Handler $handler is done" }
removed.add(handlers.removeFirst())
}
if (handlers.isEmpty()) {
logger.trace { "All handlers done" }
channel.close()
}
return removed
}
private suspend fun dispatch(message: ByteBuffer) {
val handlerDone =
try {
handlers.first().read(message)
} catch (e: Exception) {
logger.warn("Error while handling message: ", e)
false
}
if (handlerDone) {
val removed = listOf(handlers.removeFirst()) + setupHandlers()
for (handler in removed) {
handler.done()
}
}
}
suspend fun emit(message: HandshakeMessage) =
emit {
logger.trace { "Writing handshake message $message" }
HandshakeMessageSerializer.serialize(it, message, negotiatedFeatures)
}
suspend fun emit(message: SignalProxyMessage) =
emit {
logger.trace { "Writing signal proxy message $message" }
SignalProxyMessageSerializer.serialize(it, message, negotiatedFeatures)
}
suspend fun emit(
sizePrefix: Boolean = true,
f: (ChainedByteBuffer) -> Unit,
) = coroutineScope {
val sendBuffer = sendBuffer.get()
val sizeBuffer = sizeBuffer.get()
f(sendBuffer)
if (sizePrefix) {
sizeBuffer.clear()
sizeBuffer.putInt(sendBuffer.size)
sizeBuffer.flip()
channel.write(sizeBuffer)
sizeBuffer.clear()
}
channel.write(sendBuffer)
channel.flush()
sendBuffer.clear()
}
override fun close() {
channel.close()
}
companion object {
private val logger = LoggerFactory.getLogger(MessageChannel::class.java)
}
}
/*
* libquassel
* Copyright (c) 2021 Janne Mareike Koschinski
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/.
*/
package de.justjanne.libquassel.protocol.session
import de.justjanne.libquassel.protocol.util.log.info
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
import java.io.Closeable
import java.nio.channels.ClosedChannelException
import java.util.concurrent.Executors
class MessageChannelReader(
private val channel: MessageChannel,
) : Closeable {
private val executor = Executors.newSingleThreadExecutor()
private val dispatcher = executor.asCoroutineDispatcher()
private val scope = CoroutineScope(dispatcher)
private var job: Job? = null
fun start() {
job =
scope.launch {
try {
channel.init()
while (isActive && channel.channel.state().connected) {
channel.read()
}
} catch (e: ClosedChannelException) {
logger.info { "Channel closed" }
close()
}
}
}
override fun close() {
channel.close()
runBlocking { job?.cancelAndJoin() }
scope.cancel()
dispatcher.cancel()
executor.shutdown()
}
companion object {
private val logger = LoggerFactory.getLogger(MessageChannelReader::class.java)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment