A Tcp.OuttgoingConnection gathers data from an audio mixer and is send async to a sourceQueue, which processes the data.
After issuing a command there is no guarantee the next bit of data is the response. How can I feed back the response?
A 'dirty' way would be to have a static variable in which I put the data when processed with a Thread pause to wait for it but that is very inefficient. Is there an akka mechanism that can watch for a value to change and give a Future?
This is the current code:
    public Q16SocketThread(ActorSystem system) {
        Logger.debug("Loading Q16SocketThread.");
        this.system = system;
        final Flow<ByteString, ByteString, CompletionStage<Tcp.OutgoingConnection>> connection =
                Tcp.get(system).outgoingConnection(ipAddress, port);
        int bufferSize = 10;
        final SourceQueueWithComplete<ByteBuffer> sourceQueue =
                Source.<ByteBuffer>queue(bufferSize, OverflowStrategy.fail())
                        .map(input -> Hex.encodeHexString(input.array()))
                        .to(Sink.foreach(this::startProcessing))
                        .run(system);
        final Flow<ByteString, ByteString, NotUsed> repl =
                Flow.of(ByteString.class)
                        .map(ByteString::toByteBuffer)
                        .map(sourceQueue::offer)
                        .map(
                                text -> {
                                    //Logger.debug("Server: " + Hex.encodeHexString(text.array()));
                                    String hexCmd;
                                    if (!nextCmd.isEmpty()) {
                                        hexCmd = nextCmd.take();
                                    } else {
                                        hexCmd = "fe";
                                    }
                                    return ByteString.fromArray(Hex.decodeHex(hexCmd));
                                }).async();
        CompletionStage<Tcp.OutgoingConnection> connectionCS = connection.join(repl).run(system);
    }
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, message -> {
                    if (message.equalsIgnoreCase("start")) {
                        Logger.debug("Q16 thread started.");
                        nextCmd.put(sysExHeaderAllCall + "1201F7");
                    } else if (message.equalsIgnoreCase("stop")) {
                        Logger.debug("Stopping of data gathering");
                        nextCmd.put(sysExHeaderAllCall + "1200F7");
                        //self().tell(PoisonPill.getInstance(), ActorRef.noSender());
                    } else if (message.equalsIgnoreCase("version")){
                        Logger.debug("Requesting version.");
                        nextCmd.put(sysExHeaderAllCall + "1001F7");
                    }
                }).build();
    }