I am working with Kafka Parallel Consumer to consume and process messages, Now I would also like to produce new events to kafka topic. This is actually working with the ParallelStreamProcessor. But I am failing to make it work with ReactorProcessor
Here is the code that is working for me:
    pConsumer = createPConsumer()
    pConsumer.subscribe(UniLists.of(kafkaConsumerConfig.kafkaTopic))
    pConsumer.pollAndProduceMany ({ something ->
        val records = something.stream().toList()
        records.map { any ->
            println("Consuming ${any.partition()}:${any.offset()}")
            ProducerRecord<String, JsonObject>("output", any.key(),
                JsonObject(mapOf("someTest" to any.offset())))
        }
    },  { consumeProduceResult ->
        println(
            "Message ${consumeProduceResult.getOut()} saved to broker at offset " +
                    "${consumeProduceResult.getMeta().offset()}"
        )
    })
private fun createPConsumer(): ParallelStreamProcessor<String, JsonObject> {
        val producer = KafkaProducerBuilder.getProducer(kafkaConsumerConfig)
        val options = ParallelConsumerOptions.builder<String, JsonObject>()
            .ordering(ParallelConsumerOptions.ProcessingOrder.KEY)
            .maxConcurrency(parallelConsumerConfig.maxConcurrency)
            .batchSize(parallelConsumerConfig.batchSize)
            .consumer(buildConsumer(kafkaConsumerConfig))
            .producer(producer)
            .build()
        return ParallelStreamProcessor.createEosStreamProcessor(options)
    }
Expected this to send events, but it does not:
pConsumer.react { context ->            
        val events = context.stream().toList()
        // do something with events
        val results = events.map{any -> ProducerRecord<String, JsonObject>("output", any.key(),
            JsonObject(mapOf("someTest" to any.offset())))}
        Mono.just(results)
    }
Will appreciate any advice