I'm trying to test the flow, specifically when an exception is thrown, but for some reason I'm not getting anything in the errorsFromSend
channel.
Here is the gateway:
@MessagingGateway
public interface Send
{
@Gateway(requestChannel = "sending",
headers = @GatewayHeader(name = "errorChannel", expression = "@errorsFromSend"))
void send(final String s);
}
the transformer that throws exception for input = "xyz"
:
public class Transformer {
public String transform(final String s) {
if(s.equals("xyz")) {
throw new RuntimeException("xyz");
}
log.debug(s);
return s;
}
private final Logger log = LoggerFactory.getLogger(this.getClass().getName());
}
and here is the test with context:
@RunWith(SpringRunner.class)
@EnableIntegration
@ComponentScan(basePackageClasses= {sample.Send.class})
@ContextConfiguration(classes = {SendWithFlowTestConfiguration.class})
public class SendWithFlowTest {
@Test
public void testReceiving() throws Exception {
// arrange
final String payload1 = "123";
final String payload2 = "ABC";
final String payload3 = "xyz";
// act and assert
send.send(payload3);
send.send(payload1);
send.send(payload2);
Message<?> fromErrorsFromSend = errorsFromSend.receive(100); // returns null!
Assertions.assertThat(fromErrorsFromSend.getPayload()).isEqualTo(payload3);
fromErrorsFromSend = errorsFromSend.receive(0);
Assertions.assertThat(fromErrorsFromSend).isEqualTo(null);
// verify
}
@Autowired
private QueueChannel errorsFromSend;
@Autowired
private Send send;
}
@Configuration
@EnableIntegration
@IntegrationComponentScan
class SendWithFlowTestConfiguration {
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedRate(1)
.maxMessagesPerPoll(1)
.get();
}
@Bean
public DirectChannel receiving() {
return new DirectChannel();
}
@Bean
public QueueChannel sending() {
return new QueueChannel();
}
@Bean
public QueueChannel errorsFromSend() {
return new QueueChannel();
}
@Bean
public Transformer transformer() {
return new Transformer();
}
@Bean
@Value("2")
public TaskExecutor executor(final int poolSize) {
return new TaskExecutorAdapter(Executors.newFixedThreadPool(poolSize));
}
@Bean
public IntegrationFlow flow(@Qualifier("executor") final TaskExecutor executor) {
return IntegrationFlows.from(sending())
.channel(MessageChannels.executor("receiving", executor).get())
.transform(transformer())
.handle(m -> System.out.println(">> " + m.getPayload()))
.get();
}
}
Can someone let me know why errorsFromSend
channel does not get the exception? Here is the trimmed log, as per Gary's suggestion:
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.context.annotation.internalConfigurationAnnotationProcessor'
[main ] DEBUG Identified candidate component class: file [C:\stash\sample\bin\main\sample\SampleApplication.class]
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.condition.BeanTypeRegistry'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.AutoConfigurationPackages'
[main ] DEBUG @EnableAutoConfiguration was declared on a class in the package 'sample'. Automatic @Repository and @Entity scanning is enabled.
[main ] DEBUG Identified candidate component class: file [C:\stash\sample\bin\main\sample\Send.class]
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.test.context.ImportsContextCustomizer$ImportsCleanupPostProcessor'
[main ] DEBUG Creating shared instance of singleton bean 'IntegrationConfigurationBeanFactoryPostProcessor'
[main ] DEBUG Creating shared instance of singleton bean 'propertySourcesPlaceholderConfigurer'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.test.mock.mockito.MockitoPostProcessor'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.context.event.internalEventListenerProcessor'
[main ] DEBUG Creating shared instance of singleton bean 'DefaultConfiguringBeanFactoryPostProcessor'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.context.properties.ConfigurationBeanFactoryMetadata'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.context.event.internalEventListenerFactory'
[main ] INFO No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
[main ] INFO No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
[main ] DEBUG SpEL function '#xpath' isn't registered: there is no spring-integration-xml.jar on the classpath.
[main ] INFO No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.context.annotation.internalAutowiredAnnotationProcessor'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.context.annotation.internalCommonAnnotationProcessor'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.test.mock.mockito.MockitoPostProcessor$SpyPostProcessor'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.aop.config.internalAutoProxyCreator'
[main ] DEBUG Creating shared instance of singleton bean 'persistenceExceptionTranslationPostProcessor'
[main ] DEBUG Autowiring by type from bean name 'persistenceExceptionTranslationPostProcessor' via factory method to bean named 'environment'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.internalMessagingAnnotationPostProcessor'
[main ] DEBUG Creating shared instance of singleton bean 'integrationDisposableAutoCreatedBeans'
[main ] INFO Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[main ] DEBUG Creating shared instance of singleton bean 'integrationManagementConfigurer'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.config.IntegrationManagementConfiguration'
[main ] INFO Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration$$EnhancerBySpringCGLIB$$7474d083] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.dsl.IntegrationFlowDefinition$ReplyProducerCleaner'
[main ] DEBUG Creating shared instance of singleton bean 'globalChannelInterceptorProcessor'
[main ] DEBUG Creating shared instance of singleton bean 'sendWithFlowTestConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'sampleApplication'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.context.defaultPollerMetadata'
[main ] DEBUG Creating shared instance of singleton bean 'receiving'
[main ] DEBUG Creating shared instance of singleton bean 'integrationGlobalProperties'
[main ] DEBUG Creating shared instance of singleton bean 'messageBuilderFactory'
[main ] DEBUG Creating shared instance of singleton bean 'mergedIntegrationGlobalProperties'
[main ] DEBUG Creating shared instance of singleton bean 'datatypeChannelMessageConverter'
[main ] DEBUG Creating shared instance of singleton bean 'sending'
[main ] DEBUG Creating shared instance of singleton bean 'errorsFromSend'
[main ] DEBUG Creating shared instance of singleton bean 'transformer'
[main ] DEBUG Creating shared instance of singleton bean 'executor'
[main ] DEBUG Creating shared instance of singleton bean 'flow'
[main ] DEBUG Autowiring by type from bean name 'flow' via factory method to bean named 'executor'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.dsl.context.IntegrationFlowContext'
[main ] DEBUG Creating shared instance of singleton bean 'flow.bridge#0'
[main ] DEBUG Creating shared instance of singleton bean 'flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
[main ] DEBUG Creating shared instance of singleton bean 'flow.org.springframework.integration.transformer.MethodInvokingTransformer#0'
[main ] DEBUG Creating shared instance of singleton bean 'flow.transformer#0'
[main ] DEBUG Creating shared instance of singleton bean 'flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
[main ] DEBUG Creating shared instance of singleton bean 'flow.channel#0'
[main ] DEBUG Creating shared instance of singleton bean 'flow.p2.SendWithFlowTestConfiguration$$Lambda$123/1338368149#0'
[main ] DEBUG Creating shared instance of singleton bean 'flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'
[main ] DEBUG Creating shared instance of singleton bean 'channelInitializer'
[main ] DEBUG Creating shared instance of singleton bean '$autoCreateChannelCandidates'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'mbeanExporter'
[main ] DEBUG Creating shared instance of singleton bean 'objectNamingStrategy'
[main ] DEBUG Autowiring by type from bean name 'mbeanExporter' via factory method to bean named 'objectNamingStrategy'
[main ] DEBUG Creating shared instance of singleton bean 'mbeanServer'
[main ] DEBUG Found MBeanServer: com.sun.jmx.mbeanserver.JmxMBeanServer@5778826f
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.aop.AopAutoConfiguration$CglibAutoProxyConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.aop.AopAutoConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.context.ConfigurationPropertiesAutoConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.dao.PersistenceExceptionTranslationAutoConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.info.ProjectInfoAutoConfiguration'
[main ] DEBUG Cannot find '.class' file for class [class org.springframework.boot.autoconfigure.info.ProjectInfoAutoConfiguration$$EnhancerBySpringCGLIB$$457d226d] - unable to determine constructor/method parameter names
[main ] DEBUG Creating shared instance of singleton bean 'spring.info-org.springframework.boot.autoconfigure.info.ProjectInfoProperties'
[main ] DEBUG Autowiring by type from bean name 'org.springframework.boot.autoconfigure.info.ProjectInfoAutoConfiguration' via constructor to bean named 'spring.info-org.springframework.boot.autoconfigure.info.ProjectInfoProperties'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationComponentScanConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'send'
[main ] DEBUG Creating shared instance of singleton bean 'integrationEvaluationContext'
[main ] DEBUG Creating shared instance of singleton bean 'jsonPath'
[main ] DEBUG Creating shared instance of singleton bean 'taskScheduler'
[main ] INFO Initializing ExecutorService 'taskScheduler'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationManagementConfiguration$EnableIntegrationManagementConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationManagementConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'spring.integration-org.springframework.boot.autoconfigure.integration.IntegrationProperties'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.reactor.core.ReactorCoreAutoConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'spring.reactor-org.springframework.boot.autoconfigure.reactor.core.ReactorCoreProperties'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration'
[main ] DEBUG Cannot find '.class' file for class [class org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration$$EnhancerBySpringCGLIB$$9522ae2a] - unable to determine constructor/method parameter names
[main ] DEBUG Creating shared instance of singleton bean 'spring.task.execution-org.springframework.boot.autoconfigure.task.TaskExecutionProperties'
[main ] DEBUG Autowiring by type from bean name 'org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration' via constructor to bean named 'spring.task.execution-org.springframework.boot.autoconfigure.task.TaskExecutionProperties'
[main ] DEBUG Creating shared instance of singleton bean 'taskExecutorBuilder'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'taskSchedulerBuilder'
[main ] DEBUG Creating shared instance of singleton bean 'spring.task.scheduling-org.springframework.boot.autoconfigure.task.TaskSchedulingProperties'
[main ] DEBUG Autowiring by type from bean name 'taskSchedulerBuilder' via factory method to bean named 'spring.task.scheduling-org.springframework.boot.autoconfigure.task.TaskSchedulingProperties'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration'
[main ] DEBUG Creating shared instance of singleton bean 'platformTransactionManagerCustomizers'
[main ] DEBUG Creating shared instance of singleton bean 'spring.transaction-org.springframework.boot.autoconfigure.transaction.TransactionProperties'
[main ] DEBUG Creating shared instance of singleton bean 'nullChannel'
[main ] DEBUG Creating shared instance of singleton bean 'errorChannel'
[main ] DEBUG Creating shared instance of singleton bean '_org.springframework.integration.errorLogger.handler'
[main ] DEBUG Creating shared instance of singleton bean '_org.springframework.integration.errorLogger'
[main ] DEBUG Creating shared instance of singleton bean 'integrationSimpleEvaluationContext'
[main ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0'
[main ] DEBUG Creating shared instance of singleton bean 'integrationLifecycleRoleController'
[main ] DEBUG Creating shared instance of singleton bean 'integrationHeaderChannelRegistry'
[main ] DEBUG Creating shared instance of singleton bean 'integrationArgumentResolverMessageConverter'
[main ] DEBUG Creating shared instance of singleton bean 'integrationArgumentResolvers'
[main ] DEBUG Creating shared instance of singleton bean 'integrationListArgumentResolvers'
[main ] DEBUG
Spring Integration global properties:
spring.integration.endpoints.noAutoStartup=
spring.integration.taskScheduler.poolSize=10
spring.integration.channels.maxUnicastSubscribers=0x7fffffff
spring.integration.channels.autoCreate=true
spring.integration.channels.maxBroadcastSubscribers=0x7fffffff
spring.integration.readOnly.headers=
spring.integration.messagingTemplate.throwExceptionOnLateReply=false
[main ] DEBUG Registering beans for JMX exposure on startup
[main ] DEBUG Autodetecting user-defined JMX MBeans
[main ] DEBUG No global channel interceptors.
[main ] DEBUG Starting beans in phase -2147483648
[main ] INFO Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
[main ] INFO Channel 'org.springframework.context.support.GenericApplicationContext@530612ba.errorChannel' has 1 subscriber(s).
[main ] INFO started _org.springframework.integration.errorLogger
[main ] DEBUG Successfully started bean '_org.springframework.integration.errorLogger'
[main ] INFO Adding {transformer} as a subscriber to the 'receiving' channel
[main ] INFO Channel 'receiving' has 1 subscriber(s).
[main ] INFO started flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1
[main ] DEBUG Successfully started bean 'flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
[main ] INFO Channel 'org.springframework.context.support.GenericApplicationContext@530612ba.flow.channel#0' has 1 subscriber(s).
[main ] INFO started flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2
[main ] DEBUG Successfully started bean 'flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'
[main ] DEBUG Starting beans in phase 0
[main ] INFO started send
[main ] INFO started send
[main ] DEBUG Successfully started bean 'send'
[main ] DEBUG Starting beans in phase 1073741823
[main ] INFO started flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0
[main ] DEBUG Successfully started bean 'flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
[main ] DEBUG preSend on channel 'sending', message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[main ] DEBUG postSend (sent=true) on channel 'sending', message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[cheduler-1] DEBUG postReceive on channel 'sending', message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[cheduler-1] DEBUG Poll resulted in Message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[main ] DEBUG preSend on channel 'sending', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[cheduler-1] DEBUG flow.bridge#0 received message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[main ] DEBUG postSend (sent=true) on channel 'sending', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[main ] DEBUG preSend on channel 'sending', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[cheduler-1] DEBUG preSend on channel 'receiving', message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[main ] DEBUG postSend (sent=true) on channel 'sending', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[cheduler-1] DEBUG postSend (sent=true) on channel 'receiving', message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[1-thread-1] DEBUG flow.transformer#0 received message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[cheduler-1] DEBUG postReceive on channel 'sending', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[cheduler-1] DEBUG Poll resulted in Message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[cheduler-1] DEBUG flow.bridge#0 received message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[cheduler-1] DEBUG preSend on channel 'receiving', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[cheduler-1] DEBUG postSend (sent=true) on channel 'receiving', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[1-thread-2] DEBUG flow.transformer#0 received message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[cheduler-2] DEBUG postReceive on channel 'sending', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[cheduler-2] DEBUG Poll resulted in Message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[cheduler-2] DEBUG flow.bridge#0 received message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[cheduler-2] DEBUG preSend on channel 'receiving', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[cheduler-2] DEBUG postSend (sent=true) on channel 'receiving', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[1-thread-2] DEBUG 123
[1-thread-2] DEBUG preSend on channel 'flow.channel#0', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=821dad29-8be4-5a5f-7369-d1393fb5790a, timestamp=1544719739319}]
>> 123
[1-thread-2] DEBUG postSend (sent=true) on channel 'flow.channel#0', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=821dad29-8be4-5a5f-7369-d1393fb5790a, timestamp=1544719739319}]
[1-thread-2] DEBUG flow.transformer#0 received message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[1-thread-2] DEBUG ABC
>> ABC
[1-thread-2] DEBUG preSend on channel 'flow.channel#0', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=9aca3eed-c695-2054-ff18-5b1c62f60158, timestamp=1544719739319}]
[1-thread-2] DEBUG postSend (sent=true) on channel 'flow.channel#0', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=9aca3eed-c695-2054-ff18-5b1c62f60158, timestamp=1544719739319}]
Exception in thread "pool-1-thread-1"
org.springframework.integration.transformer.MessageTransformationException: Failed to transform Message; nested exception is org.springframework.messaging.MessageHandlingException: nested exception is java.lang.RuntimeException: xyz, failedMessage=GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}], failedMessage=GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:114)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:53)
at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:114)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is java.lang.RuntimeException: xyz, failedMessage=GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:113)
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:108)
... 9 more
Caused by: java.lang.RuntimeException: xyz
at sample.Transformer.transform(Transformer.java:12)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1087)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:584)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:473)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:317)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:104)
... 11 more
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is java.lang.RuntimeException: xyz, failedMessage=GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=38b35bcb-61f1-7be4-c523-499d8fa13db7, timestamp=1544808482872}]
Since the failed message has the error channel header; it should have been routed properly by the error handler in the poller.
If you can post the complete project (perhaps in the Gist, or the complete project in GitHub), we can take a look.
EDIT
It looks like the DSL has a bug, the executor channel is not initialized properly so your task executor is not wrapped in an ErrorHandlingTaskExecutor
.
Here is a work-around:
@Bean
public IntegrationFlow flow(@Qualifier("executor") final TaskExecutor executor)
{
return IntegrationFlows.from(sending())
.channel(execChannel(executor))
.transform(transformer())
.handle(m -> System.out.println(">> " + m.getPayload()))
.get();
}
@Bean
public ExecutorChannel execChannel(final TaskExecutor executor) {
return MessageChannels.executor("receiving", executor).get();
}
(define the channel as a @Bean
).
Or, just make sending()
an executor channel to begin with.
EDIT2
Actually, it's not really a fundamental bug; you have 2 channels called receiving
and this has confused the DSL because it found an existing bean with that name so didn't initialize the Executor channel.
User contributions licensed under CC BY-SA 3.0