I am getting started with reactive websockets using Spring Boot 2.1.3. I created a WebSocketHandler implementation like this:
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(1);
    var publisher = flux.map( o -> {
        try {
            return objectMapper.writeValueAsString(o);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }).map(session::textMessage)
      .delayElements(Duration.ofSeconds(1));
    return session.send(publisher);
}
This works, if I connect, I get serialized EfficiencyData every second in my websocket client.
However, I want to react to a request coming from the websocket to tell the service for what id I want the data. I managed to get the request info like this:
@Override
public Mono<Void> handle(WebSocketSession session) {
    return session.send(session.receive().map(webSocketMessage -> {
        int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
        return session.textMessage("Subscribing with id " + id);
    }));
Now I have no clue how to combine these 2 implementations?
I was hoping to do something like this:
@Override
public Mono<Void> handle(WebSocketSession session) {
    return session.send(session.receive().map(webSocketMessage -> {
        int id = Integer.parseInt(webSocketMessage.getPayloadAsText());
        Flux<EfficiencyData> flux = service.subscribeToEfficiencyData(id);
        var publisher = flux.map( o -> {
            try {
                return objectMapper.writeValueAsString(o);
            } catch (JsonProcessingException e) {
                e.printStackTrace();
                return null;
            }
        }).map(session::textMessage)
                            .delayElements(Duration.ofSeconds(1));
        return publisher; //Does not compile
    }));
But that does not compile since publisher is a Flux<WebSocketMessage> and it should be a Publisher<WebSocketMessage>. How should this be handled?
EDIT:
Following the Javadoc example of WebSocketHandler, I tried this:
@Override
public Mono<Void> handle(WebSocketSession session) {
    Flux<EfficiencyData> flux =
            session.receive()
                   .map(webSocketMessage -> Integer.parseInt(webSocketMessage.getPayloadAsText()))
                   .concatMap(service::subscribeToEfficiencyData);
    Mono<Void> input = flux.then();
    Mono<Void> output = session.send(flux.map(data -> session.textMessage(data.toString()))).then();
    return Mono.zip(input, output).then();
}
But that just disconnects the websocket client immediately without doing anything.
 
    