Spring Boot application using the Spring Cloud Stream Kafka Binder + Kafka Streams Binder not working - Producer doesn't send messages

1

My Spring Boot 2.3.1 app with SCS Hoshram.SR6 was using the Kafka Streams Binder. I needed to add a Kafka Producer that would be used in another part of the application so I added the kafka binder. The problem is the producer is not working, throwing an exception:

19:49:40.082 [scheduling-1] [900cdeb11106e199] ERROR o.s.c.stream.binding.BindingService - Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:332)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:148)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:79)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:222)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:90)
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152)
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding$4(BindingService.java:336)
    at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.TimeoutException: null
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:368)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:342)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:319)

This is my configuration:

spring:
  cloud:
    function:
      definition: myProducer
    stream:
      bindings:
        myKStream-in-0:
          destination: my-kstream-topic
          producer:
            useNativeEncoding: true
        myProducer-out-0:
          destination: producer-topic
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: ${kafka.brokers:localhost}
          min-partition-count: 3
          replication-factor: 3
          producerProperties:
            enable:
              idempotence: true
            retries: 0x7fffffff
            acks: all
            key:
              serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
              subject:
                name:
                  strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
            value:
              serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
              subject:
                name:
                  strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
            schema:
              registry:
                url: ${schema-registry.url:http://localhost:8081}
            request:
              timeout:
                ms: 5000
        streams:
          binder:
            brokers: ${kafka.brokers:localhost}
            configuration:
              application:
                id: ${spring.application.name}
                server: ${POD_IP:localhost}:${local.server.port:8080}
              schema:
                registry:
                  url: ${schema-registry.url}
              key:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
              processing:
                guarantee: exactly_once
              replication:
                factor: 3
              group:
                id: kpi
            deserialization-exception-handler: logandcontinue
            min-partition-count: 3
            replication-factor: 3
            state-store-retry:
              max-attempts: 20
              backoff-period: 1500

What could be the problem here?

UPDATE

I tweaked the config as follows:

spring:
  cloud:
    function:
      definition: myProducer
    stream:
      function:
        definition: myKStream

Now I can't see any exception but the messages don't get to the topic.

In another application that only uses the kafka binder it works perfectly:

@Configuration
class KafkaProducerConfiguration {

    @Bean
    fun myProducerProcessor(): EmitterProcessor<Message<XXX>> {
        return EmitterProcessor.create()
    }

    @Bean
    fun myProducer(): Supplier<Flux<Message<XXX>>> {
        return Supplier { myProducerProcessor() }
    }

}

...

@Component
class XXXProducer(@Qualifier("myProducerProcessor") private val myProducerProcessor: EmitterProcessor<Message<XXX>>) {

    fun send(...): Mono<Void> {
        return Mono.defer {          
                myProducerProcessor.onNext(message)
                Mono.empty()
        }
   }

UPDATE 2

I set logging.level.org.springframework.cloud.stream: debug

In the logs the following trace shows up:

o.s.c.s.binder.DefaultBinderFactory - Creating binder: kstream

However there isn't anything about a Creating binder: kafka.

spring-boot
apache-kafka-streams
spring-cloud-stream
spring-cloud-stream-binder-kafka
asked on Stack Overflow Jul 26, 2020 by codependent • edited Jul 26, 2020 by codependent

1 Answer

1

I was missing the multiple binder config for kafka and kstreams (https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_multi_binders_with_kafka_streams_based_binders_and_regular_kafka_binder)

Thus, I had to set up: spring.cloud.stream.function.definition=myProducer;myKStream

answered on Stack Overflow Jul 26, 2020 by codependent

User contributions licensed under CC BY-SA 3.0