I want to implement Kafka Consumer and Producer which sends and receives Java Objects. Full Source I tried this:
Producer:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> replyKafkaTemplate(ProducerFactory<String, SaleRequestFactory> producerFactory, ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory) {
ConcurrentMessageListenerContainer<String, SaleResponseFactory> kafkaMessageListenerContainer = factory.createContainer("tp-sale");
kafkaMessageListenerContainer.getContainerProperties().setGroupId("tp-sale.reply");
return new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
}
}
Send object:
@RestController
@RequestMapping("/checkout")
public class CheckoutController {
private TransactionService transactionService;
private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;
private ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate;
private static String topic = "tp-sale";
@Autowired
public CheckoutController(ValidationMessage validationMessage, TransactionService transactionService,
KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate,
ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate){
this.transactionService = transactionService;
this.saleRequestFactoryKafkaTemplate = saleRequestFactoryKafkaTemplate;
this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
}
@PostMapping("test")
private void performPayment() throws ExecutionException, InterruptedException, TimeoutException {
Transaction transaction = new Transaction();
transaction.setStatus(PaymentTransactionStatus.IN_PROGRESS.getText());
Transaction insertedTransaction = transactionService.save(transaction);
SaleRequestFactory obj = new SaleRequestFactory();
obj.setId(100);
ProducerRecord<String, SaleRequestFactory> record = new ProducerRecord<>("tp-sale", obj);
RequestReplyFuture<String, SaleRequestFactory, SaleResponseFactory> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
SendResult<String, SaleRequestFactory> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, SaleResponseFactory> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
SaleResponseFactory value = consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
}
}
Consumer:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
private String groupId = "test";
@Bean
public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Receive Object
@Component
public class ProcessingSaleListener {
private static String topic = "tp-sale";
@KafkaListener(topics = "tp-sale")
public SaleResponseFactory process(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {
System.out.println(tf.getId());
SaleResponseFactory resObj = new SaleResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
}
Custom objects
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleRequestFactory implements Serializable {
private static final long serialVersionUID = 1744050117179344127L;
private int id;
}
Serializer
import org.apache.kafka.common.serialization.Serializer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> {
@Override
public byte[] serialize(String topic, SaleRequestFactory data)
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
try
{
ObjectOutputStream outputStream = new ObjectOutputStream(out);
outputStream.writeObject(data);
out.close();
}
catch (IOException e)
{
throw new RuntimeException("Unhandled", e);
}
return out.toByteArray();
}
}
Response Object
import java.io.Serializable;
import java.time.LocalDateTime;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleResponseFactory implements Serializable {
private static final long serialVersionUID = 1744050117179344127L;
private String unique_id;
}
Response Class
import org.apache.kafka.common.serialization.Deserializer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory> {
@Override
public SaleRequestFactory deserialize(String topic, byte[] data)
{
SaleRequestFactory saleRequestFactory = null;
try
{
ByteArrayInputStream bis = new ByteArrayInputStream(data);
ObjectInputStream in = new ObjectInputStream(bis);
saleRequestFactory = (SaleRequestFactory) in.readObject();
in.close();
}
catch (IOException | ClassNotFoundException e)
{
throw new RuntimeException("Unhandled", e);
}
return saleRequestFactory;
}
}
I want to send and receive different Serialized Java Object based on the Object type. For example sometimes SaleRequestFactory and to receive SaleResponseFactory or to send AuthRequestFactory and receive AuthResponseFactory. Is it possible to send and receive diffrent Java Obects using one topic?
Full example code