Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • api-redesign
  • main
  • 0.10.0
  • 0.10.1
  • 0.10.2
  • 0.7.0
  • 0.8.0
  • 0.8.1
  • 0.9.0
  • 0.9.1
  • 0.9.2
11 results

Target

Select target project
  • justJanne/libquassel
1 result
Select Git revision
  • api-redesign
  • main
  • 0.10.0
  • 0.10.1
  • 0.10.2
  • 0.7.0
  • 0.8.0
  • 0.8.1
  • 0.9.0
  • 0.9.1
  • 0.9.2
11 results
Show changes
Showing
with 154 additions and 288 deletions
......@@ -20,11 +20,18 @@ import java.nio.ByteBuffer
object PeerPtrSerializer : PrimitiveSerializer<ULong> {
override val javaType: Class<ULong> = ULong::class.java
override fun serialize(buffer: ChainedByteBuffer, data: ULong, featureSet: FeatureSet) {
override fun serialize(
buffer: ChainedByteBuffer,
data: ULong,
featureSet: FeatureSet,
) {
buffer.putLong(data.toLong())
}
override fun deserialize(buffer: ByteBuffer, featureSet: FeatureSet): ULong {
override fun deserialize(
buffer: ByteBuffer,
featureSet: FeatureSet,
): ULong {
return buffer.getLong().toULong()
}
}
......@@ -28,14 +28,14 @@ object QHostAddressSerializer : PrimitiveSerializer<InetAddress> {
override fun serialize(
buffer: ChainedByteBuffer,
data: InetAddress,
featureSet: FeatureSet
featureSet: FeatureSet,
) {
when (data) {
is Inet4Address -> {
UByteSerializer.serialize(
buffer,
NetworkLayerProtocol.IPv4Protocol.value,
featureSet
featureSet,
)
buffer.put(data.address)
}
......@@ -43,7 +43,7 @@ object QHostAddressSerializer : PrimitiveSerializer<InetAddress> {
UByteSerializer.serialize(
buffer,
NetworkLayerProtocol.IPv6Protocol.value,
featureSet
featureSet,
)
buffer.put(data.address)
}
......@@ -51,14 +51,17 @@ object QHostAddressSerializer : PrimitiveSerializer<InetAddress> {
UByteSerializer.serialize(
buffer,
NetworkLayerProtocol.UnknownNetworkLayerProtocol.value,
featureSet
featureSet,
)
throw IllegalArgumentException("Invalid network protocol ${data.javaClass.canonicalName}")
}
}
}
override fun deserialize(buffer: ByteBuffer, featureSet: FeatureSet): InetAddress {
override fun deserialize(
buffer: ByteBuffer,
featureSet: FeatureSet,
): InetAddress {
val type = UByteSerializer.deserialize(buffer, featureSet)
return when (NetworkLayerProtocol.of(type)) {
NetworkLayerProtocol.IPv4Protocol -> {
......
......@@ -22,11 +22,18 @@ import java.nio.ByteBuffer
object TransferDirectionSerializer : PrimitiveSerializer<TransferDirection?> {
override val javaType: Class<out TransferDirection?> = TransferDirection::class.java
override fun serialize(buffer: ChainedByteBuffer, data: TransferDirection?, featureSet: FeatureSet) {
override fun serialize(
buffer: ChainedByteBuffer,
data: TransferDirection?,
featureSet: FeatureSet,
) {
IntSerializer.serialize(buffer, data?.value ?: 0, featureSet)
}
override fun deserialize(buffer: ByteBuffer, featureSet: FeatureSet) = TransferDirection.of(
IntSerializer.deserialize(buffer, featureSet)
override fun deserialize(
buffer: ByteBuffer,
featureSet: FeatureSet,
) = TransferDirection.of(
IntSerializer.deserialize(buffer, featureSet),
)
}
......@@ -25,14 +25,21 @@ object TransferIdListSerializer : PrimitiveSerializer<TransferIdList> {
@Suppress("UNCHECKED_CAST")
override val javaType: Class<TransferIdList> = List::class.java as Class<TransferIdList>
override fun serialize(buffer: ChainedByteBuffer, data: TransferIdList, featureSet: FeatureSet) {
override fun serialize(
buffer: ChainedByteBuffer,
data: TransferIdList,
featureSet: FeatureSet,
) {
IntSerializer.serialize(buffer, data.size, featureSet)
data.forEach {
UuidSerializer.serialize(buffer, it, featureSet)
}
}
override fun deserialize(buffer: ByteBuffer, featureSet: FeatureSet): TransferIdList {
override fun deserialize(
buffer: ByteBuffer,
featureSet: FeatureSet,
): TransferIdList {
val result = mutableListOf<UUID>()
val length = IntSerializer.deserialize(buffer, featureSet)
for (i in 0 until length) {
......
......@@ -22,11 +22,18 @@ import java.nio.ByteBuffer
object TransferStatusSerializer : PrimitiveSerializer<TransferStatus?> {
override val javaType: Class<out TransferStatus?> = TransferStatus::class.java
override fun serialize(buffer: ChainedByteBuffer, data: TransferStatus?, featureSet: FeatureSet) {
override fun serialize(
buffer: ChainedByteBuffer,
data: TransferStatus?,
featureSet: FeatureSet,
) {
IntSerializer.serialize(buffer, data?.value ?: 0, featureSet)
}
override fun deserialize(buffer: ByteBuffer, featureSet: FeatureSet) = TransferStatus.of(
IntSerializer.deserialize(buffer, featureSet)
override fun deserialize(
buffer: ByteBuffer,
featureSet: FeatureSet,
) = TransferStatus.of(
IntSerializer.deserialize(buffer, featureSet),
)
}
......@@ -24,12 +24,14 @@ import org.threeten.bp.ZoneOffset
object HeartBeatReplySerializer : SignalProxySerializer<SignalProxyMessage.HeartBeatReply> {
override val type: Int = 6
override fun serialize(data: SignalProxyMessage.HeartBeatReply) = listOf(
override fun serialize(data: SignalProxyMessage.HeartBeatReply) =
listOf(
qVariant(type, QtType.Int),
qVariant(data.timestamp, QtType.QDateTime)
qVariant(data.timestamp, QtType.QDateTime),
)
override fun deserialize(data: QVariantList) = SignalProxyMessage.HeartBeatReply(
data.getOrNull(1).into(Instant.EPOCH.atOffset(ZoneOffset.UTC)).toInstant()
override fun deserialize(data: QVariantList) =
SignalProxyMessage.HeartBeatReply(
data.getOrNull(1).into(Instant.EPOCH.atOffset(ZoneOffset.UTC)).toInstant(),
)
}
......@@ -24,12 +24,14 @@ import org.threeten.bp.ZoneOffset
object HeartBeatSerializer : SignalProxySerializer<SignalProxyMessage.HeartBeat> {
override val type: Int = 5
override fun serialize(data: SignalProxyMessage.HeartBeat) = listOf(
override fun serialize(data: SignalProxyMessage.HeartBeat) =
listOf(
qVariant(type, QtType.Int),
qVariant(data.timestamp, QtType.QDateTime)
qVariant(data.timestamp, QtType.QDateTime),
)
override fun deserialize(data: QVariantList) = SignalProxyMessage.HeartBeat(
data.getOrNull(1).into(Instant.EPOCH.atOffset(ZoneOffset.UTC)).toInstant()
override fun deserialize(data: QVariantList) =
SignalProxyMessage.HeartBeat(
data.getOrNull(1).into(Instant.EPOCH.atOffset(ZoneOffset.UTC)).toInstant(),
)
}
......@@ -26,15 +26,17 @@ import java.nio.ByteBuffer
object InitDataSerializer : SignalProxySerializer<SignalProxyMessage.InitData> {
override val type: Int = 4
override fun serialize(data: SignalProxyMessage.InitData) = listOf(
override fun serialize(data: SignalProxyMessage.InitData) =
listOf(
qVariant(type, QtType.Int),
qVariant(StringSerializerUtf8.serializeRaw(data.className), QtType.QByteArray),
qVariant(StringSerializerUtf8.serializeRaw(data.objectName), QtType.QByteArray),
) + data.initData.toVariantList(byteBuffer = true)
override fun deserialize(data: QVariantList) = SignalProxyMessage.InitData(
override fun deserialize(data: QVariantList) =
SignalProxyMessage.InitData(
StringSerializerUtf8.deserializeRaw(data.getOrNull(1).into<ByteBuffer>()),
StringSerializerUtf8.deserializeRaw(data.getOrNull(2).into<ByteBuffer>()),
data.drop(3).toVariantMap(byteBuffer = true)
data.drop(3).toVariantMap(byteBuffer = true),
)
}
......@@ -24,14 +24,16 @@ import java.nio.ByteBuffer
object InitRequestSerializer : SignalProxySerializer<SignalProxyMessage.InitRequest> {
override val type: Int = 3
override fun serialize(data: SignalProxyMessage.InitRequest) = listOf(
override fun serialize(data: SignalProxyMessage.InitRequest) =
listOf(
qVariant(type, QtType.Int),
qVariant(StringSerializerUtf8.serializeRaw(data.className), QtType.QByteArray),
qVariant(StringSerializerUtf8.serializeRaw(data.objectName), QtType.QByteArray),
)
override fun deserialize(data: QVariantList) = SignalProxyMessage.InitRequest(
override fun deserialize(data: QVariantList) =
SignalProxyMessage.InitRequest(
StringSerializerUtf8.deserializeRaw(data.getOrNull(1).into<ByteBuffer>()),
StringSerializerUtf8.deserializeRaw(data.getOrNull(2).into<ByteBuffer>())
StringSerializerUtf8.deserializeRaw(data.getOrNull(2).into<ByteBuffer>()),
)
}
......@@ -24,13 +24,15 @@ import java.nio.ByteBuffer
object RpcSerializer : SignalProxySerializer<SignalProxyMessage.Rpc> {
override val type: Int = 2
override fun serialize(data: SignalProxyMessage.Rpc) = listOf(
override fun serialize(data: SignalProxyMessage.Rpc) =
listOf(
qVariant(type, QtType.Int),
qVariant(StringSerializerUtf8.serializeRaw(data.slotName), QtType.QByteArray)
qVariant(StringSerializerUtf8.serializeRaw(data.slotName), QtType.QByteArray),
) + data.params
override fun deserialize(data: QVariantList) = SignalProxyMessage.Rpc(
override fun deserialize(data: QVariantList) =
SignalProxyMessage.Rpc(
StringSerializerUtf8.deserializeRaw(data.getOrNull(1).into<ByteBuffer>()),
data.drop(2)
data.drop(2),
)
}
......@@ -24,17 +24,19 @@ import java.nio.ByteBuffer
object SyncSerializer : SignalProxySerializer<SignalProxyMessage.Sync> {
override val type: Int = 1
override fun serialize(data: SignalProxyMessage.Sync) = listOf(
override fun serialize(data: SignalProxyMessage.Sync) =
listOf(
qVariant(type, QtType.Int),
qVariant(StringSerializerUtf8.serializeRaw(data.className), QtType.QByteArray),
qVariant(StringSerializerUtf8.serializeRaw(data.objectName), QtType.QByteArray),
qVariant(StringSerializerUtf8.serializeRaw(data.slotName), QtType.QByteArray)
qVariant(StringSerializerUtf8.serializeRaw(data.slotName), QtType.QByteArray),
) + data.params
override fun deserialize(data: QVariantList) = SignalProxyMessage.Sync(
override fun deserialize(data: QVariantList) =
SignalProxyMessage.Sync(
StringSerializerUtf8.deserializeRaw(data.getOrNull(1).into<ByteBuffer>()),
StringSerializerUtf8.deserializeRaw(data.getOrNull(2).into<ByteBuffer>()),
StringSerializerUtf8.deserializeRaw(data.getOrNull(3).into<ByteBuffer>()),
data.drop(4)
data.drop(4),
)
}
/*
* libquassel
* Copyright (c) 2021 Janne Mareike Koschinski
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/.
*/
package de.justjanne.libquassel.protocol.session
import de.justjanne.libquassel.annotations.ProtocolSide
import de.justjanne.libquassel.protocol.models.SignalProxyMessage
import de.justjanne.libquassel.protocol.syncables.ObjectRepository
import de.justjanne.libquassel.protocol.syncables.SyncableStub
import de.justjanne.libquassel.protocol.variant.QVariantList
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
class CommonSyncProxy(
private val protocolSide: ProtocolSide,
private val objectRepository: ObjectRepository,
private val proxyMessageHandler: ProxyMessageHandler
) : SyncProxy {
override fun synchronize(syncable: SyncableStub) {
if (objectRepository.add(syncable) && !syncable.initialized) {
runBlocking(context = Dispatchers.IO) {
proxyMessageHandler.emit(SignalProxyMessage.InitRequest(syncable.className, syncable.objectName))
}
}
}
override fun stopSynchronize(syncable: SyncableStub) {
objectRepository.remove(syncable)
}
override fun sync(
target: ProtocolSide,
className: String,
objectName: String,
function: String,
arguments: QVariantList
) {
if (target != protocolSide) {
runBlocking {
proxyMessageHandler.emit(
SignalProxyMessage.Sync(
className,
objectName,
function,
arguments
)
)
}
}
}
override fun rpc(
target: ProtocolSide,
function: String,
arguments: QVariantList
) {
if (target != protocolSide) {
runBlocking {
proxyMessageHandler.emit(
SignalProxyMessage.Rpc(
function,
arguments
)
)
}
}
}
}
......@@ -13,6 +13,8 @@ import java.nio.ByteBuffer
interface ConnectionHandler {
suspend fun init(channel: MessageChannel): Boolean
suspend fun done()
suspend fun read(buffer: ByteBuffer): Boolean
}
......@@ -13,8 +13,9 @@ import de.justjanne.libquassel.protocol.models.setup.BackendInfo
sealed class CoreState {
object Configured : CoreState()
data class Unconfigured(
val databases: List<BackendInfo>,
val authenticators: List<BackendInfo>
val authenticators: List<BackendInfo>,
) : CoreState()
}
......@@ -30,7 +30,7 @@ interface HandshakeHandler : ConnectionHandler {
/**
* Enabled client features for this connection
*/
featureSet: FeatureSet
featureSet: FeatureSet,
): CoreState
/**
......@@ -45,7 +45,7 @@ interface HandshakeHandler : ConnectionHandler {
/**
* Password of the core account
*/
password: String
password: String,
)
/**
......@@ -76,6 +76,6 @@ interface HandshakeHandler : ConnectionHandler {
/**
* Authenticator backend configuration data
*/
authenticatorConfiguration: QVariantMap
authenticatorConfiguration: QVariantMap,
)
}
......@@ -23,17 +23,19 @@ import java.io.Closeable
import java.nio.ByteBuffer
class MessageChannel(
val channel: CoroutineChannel
val channel: CoroutineChannel,
) : Closeable {
var negotiatedFeatures = FeatureSet.none()
private var handlers = mutableListOf<ConnectionHandler>()
fun register(handler: ConnectionHandler) {
handlers.add(handler)
}
private val sendBuffer = ThreadLocal.withInitial(::ChainedByteBuffer)
private val sizeBuffer = ThreadLocal.withInitial { ByteBuffer.allocateDirect(4) }
suspend fun init() {
setupHandlers()
}
......@@ -76,7 +78,8 @@ class MessageChannel(
}
private suspend fun dispatch(message: ByteBuffer) {
val handlerDone = try {
val handlerDone =
try {
handlers.first().read(message)
} catch (e: Exception) {
logger.warn("Error while handling message: ", e)
......@@ -90,17 +93,22 @@ class MessageChannel(
}
}
suspend fun emit(message: HandshakeMessage) = emit {
suspend fun emit(message: HandshakeMessage) =
emit {
logger.trace { "Writing handshake message $message" }
HandshakeMessageSerializer.serialize(it, message, negotiatedFeatures)
}
suspend fun emit(message: SignalProxyMessage) = emit {
suspend fun emit(message: SignalProxyMessage) =
emit {
logger.trace { "Writing signal proxy message $message" }
SignalProxyMessageSerializer.serialize(it, message, negotiatedFeatures)
}
suspend fun emit(sizePrefix: Boolean = true, f: (ChainedByteBuffer) -> Unit) = coroutineScope {
suspend fun emit(
sizePrefix: Boolean = true,
f: (ChainedByteBuffer) -> Unit,
) = coroutineScope {
val sendBuffer = sendBuffer.get()
val sizeBuffer = sizeBuffer.get()
......
......@@ -24,7 +24,7 @@ import java.nio.channels.ClosedChannelException
import java.util.concurrent.Executors
class MessageChannelReader(
private val channel: MessageChannel
private val channel: MessageChannel,
) : Closeable {
private val executor = Executors.newSingleThreadExecutor()
private val dispatcher = executor.asCoroutineDispatcher()
......@@ -32,7 +32,8 @@ class MessageChannelReader(
private var job: Job? = null
fun start() {
job = scope.launch {
job =
scope.launch {
try {
channel.init()
while (isActive && channel.channel.state().connected) {
......
......@@ -13,5 +13,6 @@ import de.justjanne.libquassel.protocol.models.SignalProxyMessage
interface ProxyMessageHandler : ConnectionHandler {
suspend fun emit(message: SignalProxyMessage)
suspend fun dispatch(message: SignalProxyMessage)
}
/*
* libquassel
* Copyright (c) 2021 Janne Mareike Koschinski
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/.
*/
package de.justjanne.libquassel.protocol.session
import de.justjanne.libquassel.annotations.ProtocolSide
import de.justjanne.libquassel.protocol.models.BufferInfo
import de.justjanne.libquassel.protocol.models.ids.IdentityId
import de.justjanne.libquassel.protocol.models.ids.NetworkId
import de.justjanne.libquassel.protocol.syncables.HeartBeatHandler
import de.justjanne.libquassel.protocol.syncables.ObjectRepository
import de.justjanne.libquassel.protocol.syncables.common.AliasManager
import de.justjanne.libquassel.protocol.syncables.common.BacklogManager
import de.justjanne.libquassel.protocol.syncables.common.BufferSyncer
import de.justjanne.libquassel.protocol.syncables.common.BufferViewManager
import de.justjanne.libquassel.protocol.syncables.common.CertManager
import de.justjanne.libquassel.protocol.syncables.common.CoreInfo
import de.justjanne.libquassel.protocol.syncables.common.DccConfig
import de.justjanne.libquassel.protocol.syncables.common.HighlightRuleManager
import de.justjanne.libquassel.protocol.syncables.common.Identity
import de.justjanne.libquassel.protocol.syncables.common.IgnoreListManager
import de.justjanne.libquassel.protocol.syncables.common.IrcListHelper
import de.justjanne.libquassel.protocol.syncables.common.Network
import de.justjanne.libquassel.protocol.syncables.common.NetworkConfig
import de.justjanne.libquassel.protocol.syncables.common.RpcHandler
import de.justjanne.libquassel.protocol.variant.QVariantMap
interface Session {
val side: ProtocolSide
val proxy: SyncProxy
val objectRepository: ObjectRepository
fun init(
identityInfo: List<QVariantMap>,
bufferInfos: List<BufferInfo>,
networkIds: List<NetworkId>
)
fun network(id: NetworkId): Network?
fun addNetwork(id: NetworkId)
fun removeNetwork(id: NetworkId)
fun networks(): Set<Network>
fun identity(id: IdentityId): Identity?
fun addIdentity(properties: QVariantMap)
fun removeIdentity(id: IdentityId)
fun identities(): Set<Identity>
fun certManager(id: IdentityId): CertManager?
fun certManagers(): Set<CertManager>
fun rename(className: String, oldName: String, newName: String)
val heartBeatHandler: HeartBeatHandler
val rpcHandler: RpcHandler
val aliasManager: AliasManager
val backlogManager: BacklogManager
val bufferSyncer: BufferSyncer
val bufferViewManager: BufferViewManager
val highlightRuleManager: HighlightRuleManager
val ignoreListManager: IgnoreListManager
val ircListHelper: IrcListHelper
val coreInfo: CoreInfo
val dccConfig: DccConfig
val networkConfig: NetworkConfig
}
/*
* libquassel
* Copyright (c) 2021 Janne Mareike Koschinski
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* v. 2.0. If a copy of the MPL was not distributed with this file, You can
* obtain one at https://mozilla.org/MPL/2.0/.
*/
package de.justjanne.libquassel.protocol.syncables
import de.justjanne.libquassel.protocol.util.StateHolder
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import org.threeten.bp.Instant
class HeartBeatHandler : StateHolder<Long?> {
private var lastReceived: Instant? = null
/**
* Utility function to recompute the latency value,
* usually should be called by a timer.
*/
fun recomputeLatency() {
recomputeLatency(Instant.now(), force = false)
}
/**
* Utility function to recompute the latency value with a given heartbeat value
*/
fun recomputeLatency(current: Instant, force: Boolean) {
val last = lastReceived?.toEpochMilli() ?: return
val roundtripLatency = current.toEpochMilli() - last
if (force || roundtripLatency > (this.roundtripLatency.value ?: return)) {
this.roundtripLatency.value = roundtripLatency
}
}
override fun flow(): Flow<Long?> = roundtripLatency
override fun state(): Long? = roundtripLatency.value
private val roundtripLatency = MutableStateFlow<Long?>(null)
}