Skip to content
Snippets Groups Projects
Commit b61559d8 authored by Janne Mareike Koschinski's avatar Janne Mareike Koschinski
Browse files

Cleanup, reorganization

parent f3249dca
Branches
Tags
No related merge requests found
Showing
with 210 additions and 140 deletions
......@@ -8,6 +8,7 @@ import java.nio.CharBuffer
import java.nio.charset.Charset
import java.nio.charset.CharsetDecoder
import java.nio.charset.CharsetEncoder
import java.util.concurrent.atomic.AtomicReference
abstract class StringSerializer(
private val encoder: CharsetEncoder,
......@@ -25,25 +26,37 @@ abstract class StringSerializer(
}
)
private val charBuffer = ThreadLocal<CharBuffer>()
private val thread = AtomicReference<Thread>()
private val charBuffer = CharBuffer.allocate(1024)
object UTF16 : StringSerializer(Charsets.UTF_16BE)
object UTF8 : StringSerializer(Charsets.UTF_8)
object C : StringSerializer(Charsets.ISO_8859_1, trailingNullByte = true)
private inline fun charBuffer(len: Int): CharBuffer {
if (charBuffer.get() == null)
charBuffer.set(CharBuffer.allocate(1024))
val buf = if (len >= 1024)
CharBuffer.allocate(len)
else
charBuffer.get()
charBuffer
buf.clear()
buf.limit(len)
return buf
}
override fun serialize(buffer: ChainedByteBuffer, data: String?, features: Quassel_Features) {
private inline fun <T> preventThreadRaces(f: () -> T): T {
val currentThread = Thread.currentThread()
if (!thread.compareAndSet(null, currentThread)) {
throw RuntimeException("Illegal Thread access!")
}
val result: T = f()
if (!thread.compareAndSet(currentThread, null)) {
throw RuntimeException("Illegal Thread access!")
}
return result
}
override fun serialize(buffer: ChainedByteBuffer, data: String?, features: Quassel_Features) =
preventThreadRaces {
try {
if (data == null) {
IntSerializer.serialize(buffer, -1, features)
......@@ -63,37 +76,36 @@ abstract class StringSerializer(
}
}
fun serialize(data: String?): ByteBuffer {
fun serialize(data: String?): ByteBuffer = preventThreadRaces {
try {
if (data == null) {
return ByteBuffer.allocate(0)
ByteBuffer.allocate(0)
} else {
val charBuffer = charBuffer(data.length)
charBuffer.put(data)
charBuffer.flip()
encoder.reset()
return encoder.encode(charBuffer)
encoder.encode(charBuffer)
}
} catch (e: Throwable) {
throw RuntimeException(data, e)
}
}
fun deserializeAll(buffer: ByteBuffer): String? {
fun deserializeAll(buffer: ByteBuffer): String? = preventThreadRaces {
try {
val len = buffer.remaining()
return if (len == -1) {
if (len == -1) {
null
} else {
val limit = buffer.limit()
buffer.limit(buffer.position() + len - trailingNullBytes)
//val charBuffer = charBuffer(len)
//decoder.reset()
val charBuffer = decoder.charset().decode(buffer)
//decoder.decode(buffer, charBuffer, true)
val charBuffer = charBuffer(len)
decoder.reset()
decoder.decode(buffer, charBuffer, true)
buffer.limit(limit)
buffer.position(buffer.position() + trailingNullBytes)
//charBuffer.flip()
charBuffer.flip()
charBuffer.toString()
}
} catch (e: Throwable) {
......@@ -102,20 +114,20 @@ abstract class StringSerializer(
}
}
override fun deserialize(buffer: ByteBuffer, features: Quassel_Features): String? {
override fun deserialize(buffer: ByteBuffer, features: Quassel_Features): String? =
preventThreadRaces {
try {
val len = IntSerializer.deserialize(buffer, features)
return if (len == -1) {
if (len == -1) {
null
} else {
val limit = buffer.limit()
buffer.limit(buffer.position() + Math.max(0, len - trailingNullBytes))
//val charBuffer = charBuffer(len)
val charBuffer = decoder.charset().decode(buffer)
//decoder.decode(buffer, charBuffer, true)
val charBuffer = charBuffer(len)
decoder.decode(buffer, charBuffer, true)
buffer.limit(limit)
buffer.position(buffer.position() + trailingNullBytes)
//charBuffer.flip()
charBuffer.flip()
charBuffer.toString()
}
} catch (e: Throwable) {
......
......@@ -5,5 +5,6 @@ enum class ConnectionState {
CONNECTING,
HANDSHAKE,
INIT,
CONNECTED
CONNECTED,
CLOSED
}
......@@ -11,7 +11,8 @@ import de.kuschku.libquassel.quassel.ProtocolFeature
import de.kuschku.libquassel.util.compatibility.CompatibilityUtils
import de.kuschku.libquassel.util.compatibility.HandlerService
import de.kuschku.libquassel.util.compatibility.LoggingHandler.Companion.log
import de.kuschku.libquassel.util.compatibility.LoggingHandler.LogLevel.*
import de.kuschku.libquassel.util.compatibility.LoggingHandler.LogLevel.DEBUG
import de.kuschku.libquassel.util.compatibility.LoggingHandler.LogLevel.WARN
import de.kuschku.libquassel.util.hasFlag
import de.kuschku.libquassel.util.helpers.hexDump
import de.kuschku.libquassel.util.helpers.write
......@@ -63,13 +64,12 @@ class CoreConnection(
}
fun setState(value: ConnectionState) {
log(INFO, TAG, value.name)
log(DEBUG, TAG, value.name)
state.onNext(value)
}
private fun sendHandshake() {
setState(ConnectionState.HANDSHAKE)
IntSerializer.serialize(
chainedBuffer,
0x42b33f00 or clientData.protocolFeatures.toInt(),
......@@ -122,16 +122,14 @@ class CoreConnection(
override fun close() {
try {
interrupt()
handlerService.quit()
setState(ConnectionState.DISCONNECTED)
setState(ConnectionState.CLOSED)
} catch (e: Throwable) {
log(WARN, TAG, "Error encountered while closing connection", e)
}
}
fun dispatch(message: HandshakeMessage) {
handlerService.parse {
handlerService.serialize {
try {
val data = HandshakeMessage.serialize(message)
handlerService.write(
......@@ -147,7 +145,7 @@ class CoreConnection(
}
fun dispatch(message: SignalProxyMessage) {
handlerService.parse {
handlerService.serialize {
try {
val data = SignalProxyMessage.serialize(message)
handlerService.write(
......@@ -167,7 +165,7 @@ class CoreConnection(
connect()
sendHandshake()
readHandshake()
while (!isInterrupted) {
while (!isInterrupted && state != ConnectionState.CLOSED) {
sizeBuffer.clear()
if (channel?.read(sizeBuffer) == -1)
break
......@@ -180,25 +178,32 @@ class CoreConnection(
while (dataBuffer.position() < dataBuffer.limit() && channel?.read(dataBuffer) ?: -1 > 0) {
}
dataBuffer.flip()
handlerService.parse {
handlerService.serialize {
when (state.value) {
ConnectionState.HANDSHAKE -> processHandshake(dataBuffer)
else -> processSigProxy(dataBuffer)
ConnectionState.CLOSED ->
// Connection closed, do nothing
Unit
ConnectionState.CONNECTING,
ConnectionState.HANDSHAKE ->
processHandshake(dataBuffer)
else ->
processSigProxy(dataBuffer)
}
}
}
} catch (e: Throwable) {
log(WARN, TAG, "Error encountered in connection", e)
setState(ConnectionState.DISCONNECTED)
close()
}
}
private fun processSigProxy(dataBuffer: ByteBuffer) {
private fun processSigProxy(dataBuffer: ByteBuffer) = handlerService.deserialize {
try {
val msg = SignalProxyMessage.deserialize(
VariantListSerializer.deserialize(dataBuffer, features.negotiated)
)
handlerService.handle {
handlerService.backend {
try {
handler.handle(msg)
} catch (e: Throwable) {
......@@ -206,14 +211,14 @@ class CoreConnection(
log(WARN, TAG, msg.toString())
}
}
} catch (e: Throwable) {
log(WARN, TAG, "Error encountered while parsing sigproxy message", e)
dataBuffer.hexDump()
}
}
private fun processHandshake(dataBuffer: ByteBuffer) {
try {
private fun processHandshake(dataBuffer: ByteBuffer) = try {
val msg = HandshakeMessage.deserialize(
HandshakeVariantMapSerializer.deserialize(dataBuffer, features.negotiated)
)
......@@ -224,9 +229,9 @@ class CoreConnection(
log(WARN, TAG, msg.toString())
}
} catch (e: Throwable) {
log(WARN, TAG, "Error encountered while parsing handshake message", e)
dataBuffer.hexDump()
}
log(
WARN, TAG, "Error encountered while parsing handshake message", e
)
}
val sslSession
......
......@@ -3,6 +3,7 @@ package de.kuschku.libquassel.session
import de.kuschku.libquassel.protocol.IdentityId
import de.kuschku.libquassel.protocol.NetworkId
import de.kuschku.libquassel.protocol.Quassel_Features
import de.kuschku.libquassel.protocol.message.HandshakeMessage
import de.kuschku.libquassel.quassel.syncables.*
import io.reactivex.Observable
import io.reactivex.subjects.BehaviorSubject
......@@ -29,10 +30,13 @@ interface ISession : Closeable {
val rpcHandler: RpcHandler?
val initStatus: Observable<Pair<Int, Int>>
val error: Observable<HandshakeMessage>
val lag: Observable<Long>
companion object {
val NULL = object : ISession {
override val error = BehaviorSubject.create<HandshakeMessage>()
override val state = BehaviorSubject.createDefault(ConnectionState.DISCONNECTED)
override val features: Features = Features(Quassel_Features.of(), Quassel_Features.of())
override val sslSession: SSLSession? = null
......
......@@ -18,7 +18,7 @@ class Session(
clientData: ClientData,
trustManager: X509TrustManager,
address: SocketAddress,
handlerService: HandlerService,
private val handlerService: HandlerService,
backlogStorage: BacklogStorage,
private val userData: Pair<String, String>
) : ProtocolHandler(), ISession {
......@@ -32,6 +32,8 @@ class Session(
)
override val state = coreConnection.state
override val error = BehaviorSubject.create<HandshakeMessage>()
override val aliasManager = AliasManager(this)
override val backlogManager = BacklogManager(this, backlogStorage)
override val bufferViewManager = BufferViewManager(this, this)
......@@ -57,20 +59,48 @@ class Session(
override fun handle(f: HandshakeMessage.ClientInitAck): Boolean {
features.core = f.coreFeatures ?: Quassel_Feature.NONE
if (f.coreConfigured == true) {
login()
} else {
error.onNext(f)
}
return true
}
private fun login() {
dispatch(
HandshakeMessage.ClientLogin(
user = userData.first,
password = userData.second
)
)
}
dispatch(SignalProxyMessage.HeartBeat(Instant.now()))
override fun handle(f: HandshakeMessage.CoreSetupAck): Boolean {
login()
return true
}
override fun handle(f: HandshakeMessage.ClientInitReject): Boolean {
error.onNext(f)
return true
}
override fun handle(f: HandshakeMessage.CoreSetupReject): Boolean {
error.onNext(f)
return true
}
override fun handle(f: HandshakeMessage.ClientLoginReject): Boolean {
error.onNext(f)
return true
}
override fun handle(f: HandshakeMessage.SessionInit): Boolean {
coreConnection.setState(ConnectionState.INIT)
handlerService.backend {
bufferSyncer.initSetBufferInfos(f.bufferInfos)
f.networkIds?.forEach {
......@@ -105,6 +135,7 @@ class Session(
synchronize(backlogManager)
dispatch(SignalProxyMessage.HeartBeat(Instant.now()))
}
return true
}
......
......@@ -3,6 +3,7 @@ package de.kuschku.libquassel.session
import de.kuschku.libquassel.protocol.ClientData
import de.kuschku.libquassel.protocol.IdentityId
import de.kuschku.libquassel.protocol.NetworkId
import de.kuschku.libquassel.protocol.message.HandshakeMessage
import de.kuschku.libquassel.quassel.syncables.*
import de.kuschku.libquassel.quassel.syncables.interfaces.invokers.Invokers
import de.kuschku.libquassel.util.compatibility.HandlerService
......@@ -15,7 +16,11 @@ import io.reactivex.subjects.BehaviorSubject
import javax.net.ssl.SSLSession
import javax.net.ssl.X509TrustManager
class SessionManager(offlineSession: ISession, val backlogStorage: BacklogStorage) : ISession {
class SessionManager(offlineSession: ISession,
val backlogStorage: BacklogStorage,
val handlerService: HandlerService) : ISession {
override val error: Observable<HandshakeMessage>
get() = session.or(lastSession).error
override val features: Features
get() = session.or(lastSession).features
override val sslSession: SSLSession?
......@@ -54,7 +59,6 @@ class SessionManager(offlineSession: ISession, val backlogStorage: BacklogStorag
private var lastClientData: ClientData? = null
private var lastTrustManager: X509TrustManager? = null
private var lastAddress: SocketAddress? = null
private var lastHandlerService: (() -> HandlerService)? = null
private var lastUserData: Pair<String, String>? = null
private var lastShouldReconnect = false
......@@ -90,16 +94,17 @@ class SessionManager(offlineSession: ISession, val backlogStorage: BacklogStorag
}
fun ifDisconnected(closure: () -> Unit) {
if (state.or(ConnectionState.DISCONNECTED) == ConnectionState.DISCONNECTED) {
state.or(ConnectionState.DISCONNECTED).let {
if (it == ConnectionState.DISCONNECTED || it == ConnectionState.CLOSED) {
closure()
}
}
}
fun connect(
clientData: ClientData,
trustManager: X509TrustManager,
address: SocketAddress,
handlerService: () -> HandlerService,
userData: Pair<String, String>,
shouldReconnect: Boolean = false
) {
......@@ -107,7 +112,6 @@ class SessionManager(offlineSession: ISession, val backlogStorage: BacklogStorag
lastClientData = clientData
lastTrustManager = trustManager
lastAddress = address
lastHandlerService = handlerService
lastUserData = userData
lastShouldReconnect = shouldReconnect
inProgressSession.onNext(
......@@ -115,7 +119,7 @@ class SessionManager(offlineSession: ISession, val backlogStorage: BacklogStorag
clientData,
trustManager,
address,
handlerService(),
handlerService,
backlogStorage,
userData
)
......@@ -127,14 +131,13 @@ class SessionManager(offlineSession: ISession, val backlogStorage: BacklogStorag
val clientData = lastClientData
val trustManager = lastTrustManager
val address = lastAddress
val handlerService = lastHandlerService
val userData = lastUserData
if (clientData != null && trustManager != null && address != null && handlerService != null && userData != null) {
if (state.or(
ConnectionState.DISCONNECTED
) == ConnectionState.DISCONNECTED || forceReconnect) {
connect(clientData, trustManager, address, handlerService, userData, forceReconnect)
if (clientData != null && trustManager != null && address != null && userData != null) {
state.or(ConnectionState.DISCONNECTED).let {
if (it == ConnectionState.DISCONNECTED || it == ConnectionState.CLOSED || forceReconnect) {
connect(clientData, trustManager, address, userData, forceReconnect)
}
}
}
}
......
package de.kuschku.libquassel.util.compatibility
interface HandlerService {
fun parse(f: () -> Unit)
fun serialize(f: () -> Unit)
fun deserialize(f: () -> Unit)
fun write(f: () -> Unit)
fun handle(f: () -> Unit)
fun backend(f: () -> Unit)
fun backendDelayed(delayMillis: Long, f: () -> Unit)
fun quit()
......
......@@ -4,12 +4,25 @@ import de.kuschku.libquassel.util.compatibility.HandlerService
import java.util.concurrent.Executors
class JavaHandlerService : HandlerService {
private val parseExecutor = Executors.newSingleThreadExecutor()
override fun backendDelayed(delayMillis: Long, f: () -> Unit) = backend(f)
private val serializeExecutor = Executors.newSingleThreadExecutor()
private val deserializeExecutor = Executors.newSingleThreadExecutor()
private val writeExecutor = Executors.newSingleThreadExecutor()
private val backendExecutor = Executors.newSingleThreadExecutor()
override fun parse(f: () -> Unit) {
parseExecutor.submit {
override fun serialize(f: () -> Unit) {
serializeExecutor.submit {
try {
f()
} catch (e: Throwable) {
exceptionHandler?.uncaughtException(Thread.currentThread(), e)
}
}
}
override fun deserialize(f: () -> Unit) {
deserializeExecutor.submit {
try {
f()
} catch (e: Throwable) {
......@@ -28,7 +41,7 @@ class JavaHandlerService : HandlerService {
}
}
override fun handle(f: () -> Unit) {
override fun backend(f: () -> Unit) {
backendExecutor.submit {
try {
f()
......@@ -39,7 +52,7 @@ class JavaHandlerService : HandlerService {
}
override fun quit() {
parseExecutor.shutdownNow()
serializeExecutor.shutdownNow()
writeExecutor.shutdownNow()
backendExecutor.shutdownNow()
}
......
......@@ -13,6 +13,7 @@ import de.kuschku.libquassel.quassel.syncables.IrcUser
import de.kuschku.libquassel.quassel.syncables.Network
import de.kuschku.libquassel.session.Backend
import de.kuschku.libquassel.session.ISession
import de.kuschku.libquassel.session.SessionManager
import de.kuschku.libquassel.util.and
import de.kuschku.libquassel.util.hasFlag
import de.kuschku.quasseldroid.util.helper.*
......@@ -26,11 +27,7 @@ class QuasselViewModel : ViewModel() {
this.backendWrapper.value = backendWrapper
}
private val buffer = MutableLiveData<BufferId>()
fun getBuffer(): LiveData<BufferId> = buffer
fun setBuffer(buffer: BufferId) {
this.buffer.value = buffer
}
val buffer = MutableLiveData<BufferId>()
private val bufferViewConfigId = MutableLiveData<Int?>()
fun getBufferViewConfigId(): LiveData<Int?> = bufferViewConfigId
......@@ -49,10 +46,10 @@ class QuasselViewModel : ViewModel() {
}
val backend = backendWrapper.switchMap { it }
val sessionManager = backend.map { it.sessionManager() }
val session = sessionManager.switchMapRx { it.session }
val sessionManager = backend.map(Backend::sessionManager)
val session = sessionManager.switchMapRx(SessionManager::session)
val connectionProgress = sessionManager.switchMapRx { it.connectionProgress }
val connectionProgress = sessionManager.switchMapRx(SessionManager::connectionProgress)
private val bufferViewManager = session.map(ISession::bufferViewManager)
......@@ -62,6 +59,8 @@ class QuasselViewModel : ViewModel() {
}
}
val errors = session.switchMapRx(ISession::error)
private var lastMarkerLine = -1
/**
* An observable of the changes of the markerline, as pairs of `(old, new)`
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment