I have been trying to integrate kafka within spring boot but doesn't seem to work. I am getting an exception while consuming the message. I think the publisher works fine but the consumer fails to deserialize the message.
Caused by: java.lang.ClassCastException: class com.example.schema.avro.Event cannot be cast to 
    class com.example.schema.avro.Event (com.example.schema.avro.Event is in unnamed module of loader
     'app'; com.example.schema.avro.Event is in unnamed module of loader
     org.springframework.boot.devtools.restart.classloader.RestartClassLoader @58da768e)
I searched for the exception and the solution mentioned was to specific.avro.reader: true but didn't seem to work for me.
This is my pom file
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>3.2.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>5.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.0</version>
        </dependency>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.2</version>
                <executions>
                    <execution>
                        <id>schemas</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
                            <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>
yaml file
spring:
  main:
    allow-bean-definition-overriding: true
  kafka:
    bootstrap-servers: localhost:9092
    binder:
      producer-properties:
        key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        schema.registry.url: http://localhost:8081
      consumer-properties:
        key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
        value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
        schema.registry.url: http://localhost:8081
        specific.avro.reader: true
  cloud:
    stream:
      schemaRegistryClient:
        endpoint: http://localhost:8081
      bindings:
        event-out-0:
          destination: event-details
          contentType: application/*+avro
        event-in-0:
          destination: event-details
          contentType: application/*+avro
      function:
        definition: event
And in the code to publish an event
    public Event saveEvent(final EventDto eventDto) {
        final boolean send = streamBridge.send("event-out-0", new com.example.schema.avro.Event(eventDto.eventId()));
        log.info(String.valueOf(send));
    }
Configuration
@Configuration
@Slf4j
public class Config {
    @Bean
    public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") final String endPoint) {
        final ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
        client.setEndpoint(endPoint);
        return client;
    }
    @Bean
    public MessageConverter avroSchemaMessageConverterAnotherBean() {
        return new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl());
    }
    @Bean
    public Function<EventDto, Event> eventOut() {
        return e -> new Event(e.eventId());
    }
    @Bean
    public Consumer<Event> event() {
        return e -> log.info("event captured {}", e.getEventId().toString());
    }
}
