Spring kafka: failed to send Message to channel payload must not be null


Spring kafka: failed to send Message to channel payload must not be null



I have two project microservices with spring boot 1.5.9.RELEASE, the first microservice sends a user to a topic and the second microservice consumes this message, when i send the user it shows the error below in the second microservice



Stack trace:


org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'verification-token-in'; nested exception is java.lang.IllegalArgumentException: payload must not be null
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:451) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:54) ~[spring-integration-kafka-2.1.0.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:288) ~[spring-integration-kafka-2.1.0.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:279) ~[spring-integration-kafka-2.1.0.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:79) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.1.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.1.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:39) ~[spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:246) [spring-kafka-1.1.7.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1025) [spring-kafka-1.1.7.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_112]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_112]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_112]
Caused by: java.lang.IllegalArgumentException: payload must not be null
at org.springframework.util.Assert.notNull(Assert.java:134) ~[spring-core-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:57) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.support.MutableMessage.<init>(MutableMessage.java:53) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.support.MutableMessageBuilder.withPayload(MutableMessageBuilder.java:86) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:35) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.support.MutableMessageBuilderFactory.withPayload(MutableMessageBuilderFactory.java:26) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:265) ~[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:540) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:417) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
... 31 common frames omitted



kafka dependency:


<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>



Output (First Microservice):


spring:
cloud:
stream:
bindings:
verification-token-out:
destination: verification.token
contentType: application/json
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
zkNodes: localhost
defaultZkPort: 2181

public interface UserStreams {

String OUTPUT_VERIFICATION_TOKEN = "verification-token-out";

@Output(UserStreams.OUTPUT_VERIFICATION_TOKEN)
public MessageChannel outputVerificationToken();

}

@EnableBinding(UserStreams.class)
public class UserServiceImpl {

@Autowired
private UserStreams source;

@Override
public void createVerificationTokenForUser(UserDto userSender) {
source.outputVerificationToken().send(MessageBuilder.withPayload(userSender).build());
}

}



Input (Second Microservice):


spring:
cloud:
stream:
bindings:
verification-token-in:
destination: verification.token
contentType: application/octet-stream
originalContentType: application/json
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
zkNodes: localhost
defaultZkPort: 2181

public interface MailStreams {

String INPUT_VERIFICATION_TOKEN = "verification-token-in";

@Input(MailStreams.INPUT_VERIFICATION_TOKEN)
SubscribableChannel inputVerificationToekn();

}

@EnableBinding(MailStreams.class)
public class Receiver {

private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

@StreamListener(MailStreams.INPUT_VERIFICATION_TOKEN)
public void verificationToken(final UserDto payload) {
LOGGER.info("Received payload='{}'", payload);
}

}





Would you mind to show more stack trace? And what is the Spring Cloud Stream version do you use? And what Spring Kafka and Apache Kafka per se? More over it would really be much better if you can share a simple project with us on GitHub to let us to reproduce it on our side. Sounds like a bug, although I believer this was fixed before in some version...
– Artem Bilan
Jun 29 at 14:11





[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE] it is [fixed in 1.3.x ](github.com/spring-cloud/spring-cloud-stream/issues/993).
– Gary Russell
Jun 29 at 16:06


[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]




1 Answer
1



[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]


[spring-cloud-stream-1.2.2.RELEASE.jar:1.2.2.RELEASE]



The problem was due to an improper content type; this is fixed in 1.3.x. (Ditmars).





what is the version of spring boot that is compatible with spring-cloud-stream-1.3.x ?
– Aymen Kanzari
Jun 29 at 16:43





1.5.x. Currently 1.5.14.
– Gary Russell
Jun 29 at 18:14





Try removing the originalContentType and contentType configs from the consumer.
– Gary Russell
Jun 29 at 18:15


originalContentType


contentType






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

Comments

Popular posts from this blog

paramiko-expect timeout is happening after executing the command

Opening a url is failing in Swift

Export result set on Dbeaver to CSV