I am working on a POC for implementing a kafka cluster in my project. I have setup a kafka cluster in my local machine with 3 brokers. Now I am sending messages to the Kafka server using Spring MVC REST service which is internally using Spring Kafka to produce and consume messages to and from the Kafka cluster. Now I need to send files (containing XML data) to the Kafka server. I tried using Kafka's ByteArraySerializer/Deserializer to do this. I am converting the XMl file to a byteArray and sending it to the Kafka server but my byteArray is getting converted to an array of numbers. I am pasting the code that I am using to send the message to the kafka server.
@Async
    public String sendMessageFromFile(String filePath) {
        File file = new File(filePath);
        byte[] b = new byte[(int) file.length()];
        try {
              FileInputStream fileInputStream = new FileInputStream(file);
              fileInputStream.read(b);
              for (int i = 0; i < b.length; i++) {
                  System.out.print((char)b[i]);
              }
              ListenableFuture<SendResult<Integer, byte[]>> future = kafkaTemplate.send("TESTQUEUE", b);//Sending bytearray to the kafka cluster
              future.addCallback(new ListenableFutureCallback<SendResult<Integer, byte[]>>() {
                @Override
                public void onSuccess(final SendResult<Integer, byte[]> message) {
                    LOGGER.info("sent message= " + Arrays.toString(message.getProducerRecord().value()) + " with offset= " + message.getRecordMetadata().offset());
                }
                @Override
                public void onFailure(final Throwable throwable) {
                    LOGGER.error("unable to send message from file with path= " + filePath, throwable);
                }
            });
              fileInputStream.close();
        }catch(FileNotFoundException e) {
            LOGGER.error("Invalid file path");
            return "Invalid file path";
        }catch(IOException e) {
            LOGGER.error("An error occured while reading the file: "+e.getMessage());
            return "An error occured while parsing the file";
        }
        return "File is being sent";
    }
Below is my configuration.
public class MessagingServiceConfiguration {
    private final String bootstrapServersProducer = "localhost:9092";
    private final String bootstrapServersConsumer = "localhost:9093";
    @Autowired
       private Environment env;
     @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, byte[]>
                            kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, byte[]> factory =
                                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
        @Bean
        public ConsumerFactory<Integer, byte[]> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConsumer);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            return props;
        }
        @Bean
        public ListenerServiceImpl listener() {
            return new ListenerServiceImpl();
        }
        @Bean
        public ProducerFactory<Integer, byte[]> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersProducer);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
            return props;
        }       
        @Bean
        public KafkaTemplate<Integer, byte[]> kafkaTemplate() {
            return new KafkaTemplate<Integer, byte[]>(producerFactory());
        }
}
Below is my consumer code:
@KafkaListener(id = "TEST_CONSUMER_ID", topics = "TESTQUEUE")
    public void listenMessageInQueue(byte[] msg) {
        LOGGER.info("receiving payload='{}'", msg);
        messageDao.saveMessage(msg.toString());     
    }
Below is the message i am getting back from kafka.
[60, 65, 100, 118, 97, 110, 99, 101, 83, 104, 105, 112, 109, 101, 110, 116, 78, 111, 116, 105, 102, 105, 99, 97, 116, 105, 111, 110, 32, 120, 109, 108, 110, 115, 58, 100, 115, 102, 114, 61, 34, 117, 114, 110, 58, 114, 111, 115, 101, 116, 116, 97, 110, 101, 116, 58, 115, 112, 101, 99, 105, 102, 105, 99, 97, 116, 105, 111, 110, 58, 100, 111, 109, 97, 105, 110, 58, 80, 114, 111, 99, 117, 114, 101,......
I am unable to find out what i am missing in this. Could some one point me in the right direction for accomplishing this task.
