Skip to content
Snippets Groups Projects
Select Git revision
  • a6d8ae38a5af3220e76832ee898dd22e18d850ff
  • main default protected
  • v0.1.0
3 results

deploy.sh

Blame
  • DatastreamPeer.java 12.34 KiB
    package de.kuschku.libquassel.protocols;
    
    import android.support.annotation.NonNull;
    import android.support.annotation.Nullable;
    
    import com.google.common.base.Function;
    import com.google.common.collect.Lists;
    
    import java.io.ByteArrayOutputStream;
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.nio.BufferOverflowException;
    import java.nio.BufferUnderflowException;
    import java.nio.ByteBuffer;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import de.kuschku.libquassel.BusProvider;
    import de.kuschku.libquassel.CoreConnection;
    import de.kuschku.libquassel.events.ConnectionChangeEvent;
    import de.kuschku.libquassel.events.GeneralErrorEvent;
    import de.kuschku.libquassel.functions.FunctionType;
    import de.kuschku.libquassel.functions.serializers.HeartbeatReplySerializer;
    import de.kuschku.libquassel.functions.serializers.HeartbeatSerializer;
    import de.kuschku.libquassel.functions.serializers.InitDataFunctionSerializer;
    import de.kuschku.libquassel.functions.serializers.InitRequestFunctionSerializer;
    import de.kuschku.libquassel.functions.serializers.PackedInitDataFunctionSerializer;
    import de.kuschku.libquassel.functions.serializers.PackedRpcCallFunctionSerializer;
    import de.kuschku.libquassel.functions.serializers.PackedSyncFunctionSerializer;
    import de.kuschku.libquassel.functions.serializers.UnpackedRpcCallFunctionSerializer;
    import de.kuschku.libquassel.functions.serializers.UnpackedSyncFunctionSerializer;
    import de.kuschku.libquassel.functions.types.HandshakeFunction;
    import de.kuschku.libquassel.functions.types.Heartbeat;
    import de.kuschku.libquassel.functions.types.HeartbeatReply;
    import de.kuschku.libquassel.functions.types.InitDataFunction;
    import de.kuschku.libquassel.functions.types.InitRequestFunction;
    import de.kuschku.libquassel.functions.types.RpcCallFunction;
    import de.kuschku.libquassel.functions.types.SyncFunction;
    import de.kuschku.libquassel.objects.MessageTypeRegistry;
    import de.kuschku.libquassel.primitives.QMetaType;
    import de.kuschku.libquassel.primitives.serializers.IntSerializer;
    import de.kuschku.libquassel.primitives.serializers.PrimitiveSerializer;
    import de.kuschku.libquassel.primitives.serializers.VariantVariantListSerializer;
    import de.kuschku.libquassel.primitives.types.QVariant;
    import de.kuschku.util.niohelpers.Helper;
    import de.kuschku.util.niohelpers.WrappedChannel;
    
    import static de.kuschku.util.AndroidAssert.assertFalse;
    import static de.kuschku.util.AndroidAssert.assertNotNull;
    
    /**
     * A helper class processing incoming and outgoing messages.
     * Implements the Qt Datastream protocol.
     *
     * @author Janne Koschinski
     */
    @SuppressWarnings({"unchecked", "unused"})
    public class DatastreamPeer implements RemotePeer {
        @NonNull
        private final CoreConnection connection;
        @NonNull
        private final BusProvider busProvider;
        @NonNull
        private final ExecutorService parseExecutor;
        @NonNull
        private ByteBuffer buffer = ByteBuffer.allocate(0);
    
        public DatastreamPeer(@NonNull CoreConnection connection, @NonNull BusProvider busProvider) {
            this.connection = connection;
            this.busProvider = busProvider;
            this.busProvider.dispatch.register(this);
            this.parseExecutor = Executors.newCachedThreadPool();
        }
    
        @NonNull
        public static List<QVariant<Object>> mapToList(@NonNull Map<String, QVariant> data) {
            final List<QVariant<Object>> list = new ArrayList<>(data.size() * 2);
            for (Map.Entry<String, QVariant> entry : data.entrySet()) {
                list.add(new QVariant<>(QMetaType.Type.QByteArray, entry.getKey()));
                list.add(entry.getValue());
            }
            return list;
        }
    
        @NonNull
        public static Map<String, QVariant> listToMap(@NonNull List<QVariant> data) {
            final Map<String, QVariant> map = new HashMap<>(data.size() / 2);
            for (int i = 0; i < data.size(); i += 2) {
                map.put((String) data.get(i).data, data.get(i + 1));
            }
            return map;
        }
    
        @NonNull
        public static <T> Map<T, T> unboxedListToMap(@NonNull List<T> data) {
            final Map<T, T> map = new HashMap<>(data.size() / 2);
            for (int i = 0; i < data.size(); i += 2) {
                map.put(data.get(i), data.get(i + 1));
            }
            return map;
        }
    
        @NonNull
        public static List unboxList(@NonNull List packedFunc) {
            return Lists.transform(packedFunc, new Function<QVariant, Object>() {
                @Override
                public Object apply(@Nullable QVariant input) {
                    assertNotNull(input);
    
                    return input.data;
                }
            });
        }
    
        public void onEventBackgroundThread(@NonNull SyncFunction func) {
            assertNotNull(connection.getOutputExecutor());
            assertFalse(connection.getOutputExecutor().isShutdown());
            connection.getOutputExecutor().submit(new OutputRunnable<>(
                    VariantVariantListSerializer.<SyncFunction>get(),
                    UnpackedSyncFunctionSerializer.get().serialize(func)
            ));
        }
    
        public void onEventBackgroundThread(@NonNull RpcCallFunction func) {
            assertNotNull(connection.getOutputExecutor());
            assertFalse(connection.getOutputExecutor().isShutdown());
            connection.getOutputExecutor().submit(new OutputRunnable<>(
                    VariantVariantListSerializer.<RpcCallFunction>get(),
                    UnpackedRpcCallFunctionSerializer.get().serialize(func)
            ));
        }
    
        public void onEventBackgroundThread(@NonNull InitRequestFunction func) {
            assertNotNull(connection.getOutputExecutor());
            assertFalse(connection.getOutputExecutor().isShutdown());
            connection.getOutputExecutor().submit(new OutputRunnable<>(
                    VariantVariantListSerializer.<InitRequestFunction>get(),
                    InitRequestFunctionSerializer.get().serializePacked(func)
            ));
        }
    
        public void onEventBackgroundThread(@NonNull InitDataFunction func) {
            assertNotNull(connection.getOutputExecutor());
            assertFalse(connection.getOutputExecutor().isShutdown());
            connection.getOutputExecutor().submit(new OutputRunnable<>(
                    VariantVariantListSerializer.<InitDataFunction>get(),
                    InitDataFunctionSerializer.get().serialize(func)
            ));
        }
    
        public void onEventBackgroundThread(@NonNull Heartbeat func) {
            assertNotNull(connection.getOutputExecutor());
            assertFalse(connection.getOutputExecutor().isShutdown());
            connection.getOutputExecutor().submit(new OutputRunnable<>(
                    VariantVariantListSerializer.<InitDataFunction>get(),
                    HeartbeatSerializer.get().serialize(func)
            ));
        }
    
        public void onEventBackgroundThread(@NonNull HeartbeatReply func) {
            assertNotNull(connection.getOutputExecutor());
            assertFalse(connection.getOutputExecutor().isShutdown());
            connection.getOutputExecutor().submit(new OutputRunnable<>(
                    VariantVariantListSerializer.<InitDataFunction>get(),
                    HeartbeatReplySerializer.get().serialize(func)
            ));
        }
    
        public void onEventBackgroundThread(@NonNull HandshakeFunction func) {
            assertNotNull(connection.getOutputExecutor());
            assertFalse(connection.getOutputExecutor().isShutdown());
            Map<String, QVariant> variantMap = MessageTypeRegistry.toVariantMap(func.data).data;
            assertNotNull(variantMap);
            connection.getOutputExecutor().submit(new OutputRunnable<>(
                    VariantVariantListSerializer.get(),
                    DatastreamPeer.mapToList(variantMap)
            ));
        }
    
        private void handleHandshakeMessage(@NonNull List data) {
            busProvider.handle(MessageTypeRegistry.from(DatastreamPeer.listToMap(data)));
        }
    
        private void handlePackedFunc(@NonNull List<QVariant> data) {
            final FunctionType type = FunctionType.fromId((int) data.remove(0).data);
            switch (type) {
                case SYNC:
                    busProvider.handle(PackedSyncFunctionSerializer.get().deserialize(data));
                    break;
                case RPCCALL:
                    busProvider.handle(PackedRpcCallFunctionSerializer.get().deserialize(data));
                    break;
                case INITREQUEST:
                    busProvider.handle(InitRequestFunctionSerializer.get().deserialize(data));
                    break;
                case INITDATA:
                    busProvider.handle(PackedInitDataFunctionSerializer.get().deserialize(data));
                    break;
                case HEARTBEAT:
                    busProvider.handle(HeartbeatSerializer.get().deserialize(data));
                    break;
                case HEARTBEATREPLY:
                    busProvider.handle(HeartbeatReplySerializer.get().deserialize(data));
                    break;
                default:
                    busProvider.sendEvent(new GeneralErrorEvent("Unknown package received: " + data));
                    break;
            }
        }
    
        public void processMessage() throws IOException {
            buffer = ByteBuffer.allocate(4);
            connection.getChannel().read(buffer);
    
            final int size = IntSerializer.get().deserialize(buffer);
    
            if (size == 0) return;
    
            buffer = ByteBuffer.allocate(size);
            connection.getChannel().read(buffer);
    
            parseExecutor.submit(new ParseRunnable(buffer));
        }
    
        @NonNull
        @Override
        public ByteBuffer getBuffer() {
            return buffer;
        }
    
        /**
         * A special runnable that serializes an object into a buffer, writes the size of the buffer, and, if necessary,
         * compresses it with deflate.
         *
         * @param <T>
         */
        private class OutputRunnable<T> implements Runnable {
            @NonNull
            private final T data;
            @NonNull
            private final PrimitiveSerializer<T> serializer;
    
            public OutputRunnable(@NonNull PrimitiveSerializer<T> serializer, @NonNull T data) {
                this.data = data;
                this.serializer = serializer;
            }
    
            @Override
            public void run() {
                assertNotNull(connection.getChannel());
    
                try {
                    // TODO: Reuse buffer
    
                    // Create a buffer
                    final ByteArrayOutputStream out = new ByteArrayOutputStream();
                    // Wrap it in a fake channel
                    final WrappedChannel fakeChannel = WrappedChannel.ofStreams(null, new DataOutputStream(out));
                    // Serialize the object into the buffer-channel
                    serializer.serialize(fakeChannel, data);
                    // Write the size of the buffer over the network
                    IntSerializer.get().serialize(connection.getChannel(), out.size());
                    // Write the content of the buffer over the network
                    connection.getChannel().write(ByteBuffer.wrap(out.toByteArray()));
                    // Flush the deflater, if existing
                    connection.getChannel().flush();
                    // Close the buffer
                    fakeChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private class ParseRunnable implements Runnable {
            ByteBuffer buffer;
    
            public ParseRunnable(ByteBuffer buffer) {
                this.buffer = buffer;
            }
    
            @Override
            public void run() {
                try {
                    // TODO: Put this into a future with a time limit, and parallelize it.
                    final List data = VariantVariantListSerializer.get().deserialize(buffer);
                    if (connection.getStatus() == ConnectionChangeEvent.Status.CONNECTING
                            || connection.getStatus() == ConnectionChangeEvent.Status.HANDSHAKE
                            || connection.getStatus() == ConnectionChangeEvent.Status.CORE_SETUP_REQUIRED
                            || connection.getStatus() == ConnectionChangeEvent.Status.USER_SETUP_REQUIRED
                            || connection.getStatus() == ConnectionChangeEvent.Status.LOGIN_REQUIRED) {
                        handleHandshakeMessage(data);
                    } else {
                        handlePackedFunc(data);
                    }
                } catch (@NonNull BufferUnderflowException | BufferOverflowException e) {
                    Helper.printHexDump(buffer.array());
                    busProvider.sendEvent(new GeneralErrorEvent(e));
                } catch (Exception e) {
                    busProvider.sendEvent(new GeneralErrorEvent(e));
                }
            }
        }
    }