Spring Cloud Stream и Kafka: ошибка сериализации Kafka Producer

Я новичок в Spring Cloud Stream и Kafka. Я получаю следующую ошибку от производителя kafka при отправке строки в полезной нагрузке. Любая помощь или идеи приветствуются. Я пробовал использовать сериализатор / десериализатор bytearray, а также json вместо обычного текста.

сообщение об ошибке: org.apache.kafka.common.errors.SerializationException: невозможно преобразовать значение класса [B в класс org.apache.kafka.common.serialization.StringSerializer, указанный в value.serializer

стек ошибок:

2019-10-25 16:13:40.762 ERROR 4628 --- [  XNIO-1 task-1] o.z.problem.spring.common.AdviceTraits   : Internal Server Error

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@ebad77c]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:186)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
        at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
		....
		
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
        at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
        at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:894)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:856)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:470)
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:407)
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:242)
        at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1095)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:176)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at com.ll.kafkaservice.greeting.GreetingsService.sendGreeting(GreetingsService.java:30)
        at com.ll.kafkaservice.greeting.GreetingsController.greetings(GreetingsController.java:29)

Здесь перечислены настройки Spring Cloud Stream.

  cloud:
    stream:
      bindings:
          greetings-in:
              destination: greetings
              #content-type: application/json
              content-type: text/plain
          greetings-out:
              destination: greetings
              #content-type: application/json
              content-type: text/plain

Настройки производителя

2019-10-25 16:12:28.346  INFO 4628 --- [  restartedMain] com.ll.kafkaservice.KafkaServiceApp      : Started KafkaServiceApp in 22.248 seconds (JVM running for 23.136)
2019-10-25 16:12:28.366 DEBUG 4628 --- [  restartedMain] c.l.k.aop.logging.LoggingAspect          : Enter: com.ll.kafkaservice.service.KafkaServiceKafkaProducer.init() with argument[s] = []
2019-10-25 16:12:28.377  INFO 4628 --- [  restartedMain] c.l.k.service.KafkaServiceKafkaProducer  : Kafka producer initializing...
2019-10-25 16:12:28.378  INFO 4628 --- [  restartedMain] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
        acks = 1
        batch.size = 16384
        bootstrap.servers = [localhost:9092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id =
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2019-10-25 16:12:28.399  INFO 4628 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.0

Здесь перечислены объекты, содержащие сообщение

package com.ll.kafkaservice.messaging;

import java.io.Serializable;

public class Greeting  implements Serializable {
	private static final long serialVersionUID = 1L;
	
	private String message;

	
	public Greeting() {
	}

	
	public String getMessage() {
		return message;
	}

	public void setMessage(String message) {
		this.message = message;
	}
	
	public String toString() {
		StringBuffer sbuffer = new StringBuffer();
		
		sbuffer.append("{");
		sbuffer.append("message:");
		sbuffer.append(message);
		sbuffer.append("}");
		
		return sbuffer.toString();
	}
}

Define the streams

package com.ll.kafkaservice.greeting;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;


public interface GreetingsStreams {
    String INPUT = "greetings-in";
    String OUTPUT = "greetings-out";
    
    @Input(INPUT)
    SubscribableChannel inboundGreetings();
    
    @Output(OUTPUT)
    MessageChannel outboundGreetings();
}

Свяжите потоки

package com.ll.kafkaservice.config;

import org.springframework.cloud.stream.annotation.EnableBinding;
import com.ll.kafkaservice.greeting.GreetingsStreams;


@EnableBinding(GreetingsStreams.class)
public class StreamsConfiguration {

}

Создать / отправить сообщение

package com.ll.kafkaservice.greeting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

import com.ll.kafkaservice.messaging.Greeting;


@Service
public class GreetingsService {
    private final Logger log = LoggerFactory.getLogger(GreetingsService.class);
    
    private final GreetingsStreams greetingsStreams;
    
    private MessageChannel messageChannel;
    
    
    public GreetingsService(GreetingsStreams greetingsStreams) {
        this.greetingsStreams = greetingsStreams;
    }
    
    public void sendGreeting(final Greeting greeting) {
        messageChannel = greetingsStreams.outboundGreetings();
        log.info("Before send {}", greeting.toString());
        messageChannel.send(MessageBuilder
        		// Sends a string to payload not the object
                .withPayload(greeting.getMessage())
                // Note:  tried this with and without the header
                //.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
                .build());
    }
}

Потребление / получение сообщения

package com.ll.kafkaservice.greeting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import com.ll.kafkaservice.messaging.Greeting;



@Component
public class GreetingsListener {
    private final Logger log = LoggerFactory.getLogger(GreetingsListener.class);

    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(@Payload Greeting greetings) {
        log.info("Received greetings: {}", greetings.getMessage());
    }
    
    //@StreamListener(GreetingsStreams.INPUT)
    //public void handleGreetings(String greetings) {
    //    log.info("Received greetings: {}", greetings);
    //}
}


person fjr    schedule 25.10.2019    source источник
comment
Обратите внимание, что ошибка [B cannot be cast to java.lang.String также может возникнуть, если POJO не определяет общедоступный конструктор без аргументов. Это не так в этом вопросе, но к вашему сведению для будущих читателей. См. здесь.   -  person Tomboyo    schedule 19.07.2021


Ответы (2)


По умолчанию платформа SCSt преобразует полезную нагрузку в byte[] и использует ByteArraySerializers.

Поскольку вы настроили привязку для использования пользовательских сериализаторов, вы должны установить useNativeEncoding в true. См. Свойства производителя < / а>.

useNativeEncoding

Если установлено значение true, исходящее сообщение сериализуется непосредственно клиентской библиотекой, которую необходимо соответствующим образом настроить (например, установить соответствующий сериализатор значений производителя Kafka). Когда используется эта конфигурация, маршаллинг исходящих сообщений не основан на contentType привязки. Когда используется собственное кодирование, потребитель несет ответственность за использование соответствующего декодера (например, десериализатора потребительских значений Kafka) для десериализации входящего сообщения. Кроме того, когда используется собственное кодирование и декодирование, свойство headerMode = embeddedHeaders игнорируется, и заголовки не встраиваются в сообщение. См. Потребительское свойство useNativeDecoding.

Однако вам нужно будет использовать JsonSerializer, а не просто сериализатор String, если вы хотите отправить POJO.

Есть ли причина, по которой вы не полагаетесь на фреймворк для преобразования за вас?

person Gary Russell    schedule 25.10.2019

Вам также необходимо опубликовать pom и файл свойств приложения. Могу поспорить, что в приложении могут быть такие строки:

kafka:
consumer:
  key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
  value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
  bootstrap.servers: localhost:9092
  group.id: fixed-asset-service
  auto.offset.reset: earliest
producer:
  key.serializer: org.apache.kafka.common.serialization.StringSerializer
  value.serializer: org.apache.kafka.common.serialization.StringSerializer
  bootstrap.servers: localhost:9092

Обычно это происходит потому, что блоги и учебные пособия имеют тенденцию только показать вам, как отправить строку. Так что сразу же, как вы попробуете какой-нибудь нестандартный объект, он почти наверняка потерпит неудачу. С spring-cloud-stream не нужно указывать десериализаторы, как это делается автоматически. Это одна из причин, по которой можно было бы использовать фреймворк, например spring-cloud-stream. И если вы найдете такую ​​строку в свойствах приложения, можете ли вы заменить ее следующим:

key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

Он должен работать. Еще лучше оставить линии и сохранить конфигурацию spring.cloud.streams

person njeru    schedule 08.11.2019