Spring integration testing flow with exception

1

I'm trying to test the flow, specifically when an exception is thrown, but for some reason I'm not getting anything in the errorsFromSend channel.

Here is the gateway:

@MessagingGateway
public interface Send
{
  @Gateway(requestChannel = "sending", 
           headers = @GatewayHeader(name = "errorChannel", expression = "@errorsFromSend"))
  void send(final String s);
}

the transformer that throws exception for input = "xyz":

public class Transformer {
  public String transform(final String s) {
    if(s.equals("xyz")) {
      throw new RuntimeException("xyz");
    }
    log.debug(s);

    return s;
  }

  private final Logger log = LoggerFactory.getLogger(this.getClass().getName());
}

and here is the test with context:

@RunWith(SpringRunner.class)
@EnableIntegration
@ComponentScan(basePackageClasses= {sample.Send.class})
@ContextConfiguration(classes = {SendWithFlowTestConfiguration.class})
public class SendWithFlowTest {
  @Test
  public void testReceiving() throws Exception {
    // arrange
    final String payload1 = "123";
    final String payload2 = "ABC";
    final String payload3 = "xyz";
    // act and assert
    send.send(payload3);
    send.send(payload1);
    send.send(payload2);

    Message<?> fromErrorsFromSend = errorsFromSend.receive(100); // returns null!
    Assertions.assertThat(fromErrorsFromSend.getPayload()).isEqualTo(payload3);
    fromErrorsFromSend = errorsFromSend.receive(0);
    Assertions.assertThat(fromErrorsFromSend).isEqualTo(null);
    // verify
  }

  @Autowired
  private QueueChannel errorsFromSend;

  @Autowired
  private Send send;
}

@Configuration
@EnableIntegration
@IntegrationComponentScan
class SendWithFlowTestConfiguration {
  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata poller() {
    return Pollers.fixedRate(1)
                  .maxMessagesPerPoll(1)
                  .get();
  }

  @Bean
  public DirectChannel receiving() {
    return new DirectChannel();
  }

  @Bean
  public QueueChannel sending() {
    return new QueueChannel();
  }

  @Bean
  public QueueChannel errorsFromSend() {
    return new QueueChannel();
  }

  @Bean
  public Transformer transformer() {
    return new Transformer();
  }

  @Bean
  @Value("2")
  public TaskExecutor executor(final int poolSize) {
    return new TaskExecutorAdapter(Executors.newFixedThreadPool(poolSize));
  }

  @Bean
  public IntegrationFlow flow(@Qualifier("executor") final TaskExecutor executor) {
    return IntegrationFlows.from(sending())
                           .channel(MessageChannels.executor("receiving", executor).get())
                           .transform(transformer())
                           .handle(m -> System.out.println(">> " + m.getPayload()))
                           .get();
  }
}

Can someone let me know why errorsFromSend channel does not get the exception? Here is the trimmed log, as per Gary's suggestion:

[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.context.annotation.internalConfigurationAnnotationProcessor'
[main      ] DEBUG Identified candidate component class: file [C:\stash\sample\bin\main\sample\SampleApplication.class]
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.condition.BeanTypeRegistry'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.AutoConfigurationPackages'
[main      ] DEBUG @EnableAutoConfiguration was declared on a class in the package 'sample'. Automatic @Repository and @Entity scanning is enabled.
[main      ] DEBUG Identified candidate component class: file [C:\stash\sample\bin\main\sample\Send.class]
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.test.context.ImportsContextCustomizer$ImportsCleanupPostProcessor'
[main      ] DEBUG Creating shared instance of singleton bean 'IntegrationConfigurationBeanFactoryPostProcessor'
[main      ] DEBUG Creating shared instance of singleton bean 'propertySourcesPlaceholderConfigurer'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.test.mock.mockito.MockitoPostProcessor'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.context.event.internalEventListenerProcessor'
[main      ] DEBUG Creating shared instance of singleton bean 'DefaultConfiguringBeanFactoryPostProcessor'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.context.properties.ConfigurationBeanFactoryMetadata'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.context.event.internalEventListenerFactory'
[main      ] INFO  No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
[main      ] INFO  No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
[main      ] DEBUG SpEL function '#xpath' isn't registered: there is no spring-integration-xml.jar on the classpath.
[main      ] INFO  No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.context.annotation.internalAutowiredAnnotationProcessor'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.context.annotation.internalCommonAnnotationProcessor'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.test.mock.mockito.MockitoPostProcessor$SpyPostProcessor'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.aop.config.internalAutoProxyCreator'
[main      ] DEBUG Creating shared instance of singleton bean 'persistenceExceptionTranslationPostProcessor'
[main      ] DEBUG Autowiring by type from bean name 'persistenceExceptionTranslationPostProcessor' via factory method to bean named 'environment'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.internalMessagingAnnotationPostProcessor'
[main      ] DEBUG Creating shared instance of singleton bean 'integrationDisposableAutoCreatedBeans'
[main      ] INFO  Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[main      ] DEBUG Creating shared instance of singleton bean 'integrationManagementConfigurer'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.config.IntegrationManagementConfiguration'
[main      ] INFO  Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration$$EnhancerBySpringCGLIB$$7474d083] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.dsl.IntegrationFlowDefinition$ReplyProducerCleaner'
[main      ] DEBUG Creating shared instance of singleton bean 'globalChannelInterceptorProcessor'
[main      ] DEBUG Creating shared instance of singleton bean 'sendWithFlowTestConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'sampleApplication'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.context.defaultPollerMetadata'
[main      ] DEBUG Creating shared instance of singleton bean 'receiving'
[main      ] DEBUG Creating shared instance of singleton bean 'integrationGlobalProperties'
[main      ] DEBUG Creating shared instance of singleton bean 'messageBuilderFactory'
[main      ] DEBUG Creating shared instance of singleton bean 'mergedIntegrationGlobalProperties'
[main      ] DEBUG Creating shared instance of singleton bean 'datatypeChannelMessageConverter'
[main      ] DEBUG Creating shared instance of singleton bean 'sending'
[main      ] DEBUG Creating shared instance of singleton bean 'errorsFromSend'
[main      ] DEBUG Creating shared instance of singleton bean 'transformer'
[main      ] DEBUG Creating shared instance of singleton bean 'executor'
[main      ] DEBUG Creating shared instance of singleton bean 'flow'
[main      ] DEBUG Autowiring by type from bean name 'flow' via factory method to bean named 'executor'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.dsl.context.IntegrationFlowContext'
[main      ] DEBUG Creating shared instance of singleton bean 'flow.bridge#0'
[main      ] DEBUG Creating shared instance of singleton bean 'flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
[main      ] DEBUG Creating shared instance of singleton bean 'flow.org.springframework.integration.transformer.MethodInvokingTransformer#0'
[main      ] DEBUG Creating shared instance of singleton bean 'flow.transformer#0'
[main      ] DEBUG Creating shared instance of singleton bean 'flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
[main      ] DEBUG Creating shared instance of singleton bean 'flow.channel#0'
[main      ] DEBUG Creating shared instance of singleton bean 'flow.p2.SendWithFlowTestConfiguration$$Lambda$123/1338368149#0'
[main      ] DEBUG Creating shared instance of singleton bean 'flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'
[main      ] DEBUG Creating shared instance of singleton bean 'channelInitializer'
[main      ] DEBUG Creating shared instance of singleton bean '$autoCreateChannelCandidates'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'mbeanExporter'
[main      ] DEBUG Creating shared instance of singleton bean 'objectNamingStrategy'
[main      ] DEBUG Autowiring by type from bean name 'mbeanExporter' via factory method to bean named 'objectNamingStrategy'
[main      ] DEBUG Creating shared instance of singleton bean 'mbeanServer'
[main      ] DEBUG Found MBeanServer: com.sun.jmx.mbeanserver.JmxMBeanServer@5778826f
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.aop.AopAutoConfiguration$CglibAutoProxyConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.aop.AopAutoConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.context.ConfigurationPropertiesAutoConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.dao.PersistenceExceptionTranslationAutoConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.info.ProjectInfoAutoConfiguration'
[main      ] DEBUG Cannot find '.class' file for class [class org.springframework.boot.autoconfigure.info.ProjectInfoAutoConfiguration$$EnhancerBySpringCGLIB$$457d226d] - unable to determine constructor/method parameter names
[main      ] DEBUG Creating shared instance of singleton bean 'spring.info-org.springframework.boot.autoconfigure.info.ProjectInfoProperties'
[main      ] DEBUG Autowiring by type from bean name 'org.springframework.boot.autoconfigure.info.ProjectInfoAutoConfiguration' via constructor to bean named 'spring.info-org.springframework.boot.autoconfigure.info.ProjectInfoProperties'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationComponentScanConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'send'
[main      ] DEBUG Creating shared instance of singleton bean 'integrationEvaluationContext'
[main      ] DEBUG Creating shared instance of singleton bean 'jsonPath'
[main      ] DEBUG Creating shared instance of singleton bean 'taskScheduler'
[main      ] INFO  Initializing ExecutorService 'taskScheduler'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationManagementConfiguration$EnableIntegrationManagementConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationManagementConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration$IntegrationConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'spring.integration-org.springframework.boot.autoconfigure.integration.IntegrationProperties'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.reactor.core.ReactorCoreAutoConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'spring.reactor-org.springframework.boot.autoconfigure.reactor.core.ReactorCoreProperties'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration'
[main      ] DEBUG Cannot find '.class' file for class [class org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration$$EnhancerBySpringCGLIB$$9522ae2a] - unable to determine constructor/method parameter names
[main      ] DEBUG Creating shared instance of singleton bean 'spring.task.execution-org.springframework.boot.autoconfigure.task.TaskExecutionProperties'
[main      ] DEBUG Autowiring by type from bean name 'org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration' via constructor to bean named 'spring.task.execution-org.springframework.boot.autoconfigure.task.TaskExecutionProperties'
[main      ] DEBUG Creating shared instance of singleton bean 'taskExecutorBuilder'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'taskSchedulerBuilder'
[main      ] DEBUG Creating shared instance of singleton bean 'spring.task.scheduling-org.springframework.boot.autoconfigure.task.TaskSchedulingProperties'
[main      ] DEBUG Autowiring by type from bean name 'taskSchedulerBuilder' via factory method to bean named 'spring.task.scheduling-org.springframework.boot.autoconfigure.task.TaskSchedulingProperties'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration'
[main      ] DEBUG Creating shared instance of singleton bean 'platformTransactionManagerCustomizers'
[main      ] DEBUG Creating shared instance of singleton bean 'spring.transaction-org.springframework.boot.autoconfigure.transaction.TransactionProperties'
[main      ] DEBUG Creating shared instance of singleton bean 'nullChannel'
[main      ] DEBUG Creating shared instance of singleton bean 'errorChannel'
[main      ] DEBUG Creating shared instance of singleton bean '_org.springframework.integration.errorLogger.handler'
[main      ] DEBUG Creating shared instance of singleton bean '_org.springframework.integration.errorLogger'
[main      ] DEBUG Creating shared instance of singleton bean 'integrationSimpleEvaluationContext'
[main      ] DEBUG Creating shared instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0'
[main      ] DEBUG Creating shared instance of singleton bean 'integrationLifecycleRoleController'
[main      ] DEBUG Creating shared instance of singleton bean 'integrationHeaderChannelRegistry'
[main      ] DEBUG Creating shared instance of singleton bean 'integrationArgumentResolverMessageConverter'
[main      ] DEBUG Creating shared instance of singleton bean 'integrationArgumentResolvers'
[main      ] DEBUG Creating shared instance of singleton bean 'integrationListArgumentResolvers'
[main      ] DEBUG 
Spring Integration global properties:

spring.integration.endpoints.noAutoStartup=
spring.integration.taskScheduler.poolSize=10
spring.integration.channels.maxUnicastSubscribers=0x7fffffff
spring.integration.channels.autoCreate=true
spring.integration.channels.maxBroadcastSubscribers=0x7fffffff
spring.integration.readOnly.headers=
spring.integration.messagingTemplate.throwExceptionOnLateReply=false

[main      ] DEBUG Registering beans for JMX exposure on startup
[main      ] DEBUG Autodetecting user-defined JMX MBeans
[main      ] DEBUG No global channel interceptors.
[main      ] DEBUG Starting beans in phase -2147483648
[main      ] INFO  Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
[main      ] INFO  Channel 'org.springframework.context.support.GenericApplicationContext@530612ba.errorChannel' has 1 subscriber(s).
[main      ] INFO  started _org.springframework.integration.errorLogger
[main      ] DEBUG Successfully started bean '_org.springframework.integration.errorLogger'
[main      ] INFO  Adding {transformer} as a subscriber to the 'receiving' channel
[main      ] INFO  Channel 'receiving' has 1 subscriber(s).
[main      ] INFO  started flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1
[main      ] DEBUG Successfully started bean 'flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
[main      ] INFO  Channel 'org.springframework.context.support.GenericApplicationContext@530612ba.flow.channel#0' has 1 subscriber(s).
[main      ] INFO  started flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2
[main      ] DEBUG Successfully started bean 'flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'
[main      ] DEBUG Starting beans in phase 0
[main      ] INFO  started send
[main      ] INFO  started send
[main      ] DEBUG Successfully started bean 'send'
[main      ] DEBUG Starting beans in phase 1073741823
[main      ] INFO  started flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0
[main      ] DEBUG Successfully started bean 'flow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
[main      ] DEBUG preSend on channel 'sending', message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[main      ] DEBUG postSend (sent=true) on channel 'sending', message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[cheduler-1] DEBUG postReceive on channel 'sending', message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[cheduler-1] DEBUG Poll resulted in Message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[main      ] DEBUG preSend on channel 'sending', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[cheduler-1] DEBUG flow.bridge#0 received message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[main      ] DEBUG postSend (sent=true) on channel 'sending', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[main      ] DEBUG preSend on channel 'sending', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[cheduler-1] DEBUG preSend on channel 'receiving', message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[main      ] DEBUG postSend (sent=true) on channel 'sending', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[cheduler-1] DEBUG postSend (sent=true) on channel 'receiving', message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[1-thread-1] DEBUG flow.transformer#0 received message: GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
[cheduler-1] DEBUG postReceive on channel 'sending', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[cheduler-1] DEBUG Poll resulted in Message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[cheduler-1] DEBUG flow.bridge#0 received message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[cheduler-1] DEBUG preSend on channel 'receiving', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[cheduler-1] DEBUG postSend (sent=true) on channel 'receiving', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[1-thread-2] DEBUG flow.transformer#0 received message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=58c31c06-fe11-59cc-df6a-3dd55668998e, timestamp=1544719739312}]
[cheduler-2] DEBUG postReceive on channel 'sending', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[cheduler-2] DEBUG Poll resulted in Message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[cheduler-2] DEBUG flow.bridge#0 received message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[cheduler-2] DEBUG preSend on channel 'receiving', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[cheduler-2] DEBUG postSend (sent=true) on channel 'receiving', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[1-thread-2] DEBUG 123
[1-thread-2] DEBUG preSend on channel 'flow.channel#0', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=821dad29-8be4-5a5f-7369-d1393fb5790a, timestamp=1544719739319}]
>> 123
[1-thread-2] DEBUG postSend (sent=true) on channel 'flow.channel#0', message: GenericMessage [payload=123, headers={errorChannel=errorsFromSend, id=821dad29-8be4-5a5f-7369-d1393fb5790a, timestamp=1544719739319}]
[1-thread-2] DEBUG flow.transformer#0 received message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=b99fbb33-dcc7-b353-8f6a-310fc1b94a47, timestamp=1544719739313}]
[1-thread-2] DEBUG ABC
>> ABC
[1-thread-2] DEBUG preSend on channel 'flow.channel#0', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=9aca3eed-c695-2054-ff18-5b1c62f60158, timestamp=1544719739319}]
[1-thread-2] DEBUG postSend (sent=true) on channel 'flow.channel#0', message: GenericMessage [payload=ABC, headers={errorChannel=errorsFromSend, id=9aca3eed-c695-2054-ff18-5b1c62f60158, timestamp=1544719739319}]
Exception in thread "pool-1-thread-1" 
org.springframework.integration.transformer.MessageTransformationException: Failed to transform Message; nested exception is org.springframework.messaging.MessageHandlingException: nested exception is java.lang.RuntimeException: xyz, failedMessage=GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}], failedMessage=GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:114)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.access$000(UnicastingDispatcher.java:53)
    at org.springframework.integration.dispatcher.UnicastingDispatcher$1.run(UnicastingDispatcher.java:114)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is java.lang.RuntimeException: xyz, failedMessage=GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=b4856e59-ef18-f9dc-bd24-fcbc4f733d2d, timestamp=1544719739311}]
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
    at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:113)
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:108)
    ... 9 more
Caused by: java.lang.RuntimeException: xyz
    at sample.Transformer.transform(Transformer.java:12)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:170)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1087)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:584)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:473)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:317)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:104)
    ... 11 more
unit-testing
error-handling
spring-integration
asked on Stack Overflow Dec 12, 2018 by xbranko • edited Dec 13, 2018 by xbranko

1 Answer

1

Caused by: org.springframework.messaging.MessageHandlingException: nested exception is java.lang.RuntimeException: xyz, failedMessage=GenericMessage [payload=xyz, headers={errorChannel=errorsFromSend, id=38b35bcb-61f1-7be4-c523-499d8fa13db7, timestamp=1544808482872}]

Since the failed message has the error channel header; it should have been routed properly by the error handler in the poller.

If you can post the complete project (perhaps in the Gist, or the complete project in GitHub), we can take a look.

EDIT

It looks like the DSL has a bug, the executor channel is not initialized properly so your task executor is not wrapped in an ErrorHandlingTaskExecutor.

Here is a work-around:

  @Bean
  public IntegrationFlow flow(@Qualifier("executor") final TaskExecutor executor)
  {
    return IntegrationFlows.from(sending())
                           .channel(execChannel(executor))
                           .transform(transformer())
                           .handle(m -> System.out.println(">> " + m.getPayload()))
                           .get();
  }

  @Bean
  public ExecutorChannel execChannel(final TaskExecutor executor) {
    return MessageChannels.executor("receiving", executor).get();
  }

(define the channel as a @Bean).

Or, just make sending() an executor channel to begin with.

EDIT2

Actually, it's not really a fundamental bug; you have 2 channels called receiving and this has confused the DSL because it found an existing bean with that name so didn't initialize the Executor channel.

answered on Stack Overflow Dec 14, 2018 by Gary Russell • edited Dec 14, 2018 by Gary Russell

User contributions licensed under CC BY-SA 3.0