I have a following consumer in my Spring Boot (WebFlux/R2DBC/Reactor Kafka) application
@EventListener(ApplicationStartedEvent::class)
fun onMyEvent() {
kafkaReceiver
.receive()
.doOnNext { record ->
val myEvent = record.value()
myService.deleteSomethingFromDbById(myEvent.myId)
.thenEmpty {
record.receiverOffset().acknowledge()
}.subscribe()
}
.subscribe()
}
I would like to add transaction synchronisation for Kafka and DB transactions. After reading docs and some stakoverflow questions
- Transaction Synchronization in spring boot with Database+ kafka example
- Spring Kafka ChainedKafkaTransactionManager doesn't synchronize with JPA Spring-data transaction
- Transaction Synchronization in Spring Kafka
- Spring @Transactional with a transaction across multiple data sources
seems like ChainedKafkaTransactionManager would be the way to go.
But following code won't work as ChainedKafkaTransactionManager expects transaction managers of type PlatformTransactionManager. So parameter r2dbcTransactionManager is not accepted.
@Bean(name = ["chainedTransactionManager"])
fun chainedTransactionManager(
r2dbcTransactionManager: R2dbcTransactionManager,
kafkaTransactionManager: KafkaTransactionManager<*, *>
) = ChainedKafkaTransactionManager(kafkaTransactionManager, r2dbcTransactionManager)
Is there another way to achieve this?