Tuesday, March 28, 2017

Integration-Testing JMS MessageListeners with Spring

The Goal

  • Send a JMS Message into a  JMS queue
  • Verify that the expected work has been performed

Why is this difficult?

  • JMS MessageListeners are asynchronous
  • When testing with a queue on a remote server there will almost always be other consumers stealing your messages
  • Between sending the JMS Message into the queue and asserting the result the testing code would have to wait an unknown amount of time which usually results in code like the following:

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = {Config.class, BadTestConfig.class})
@DirtiesContext
public class BadTest {
    @Autowired
    private JmsTemplate jmsTemplate;
    @Autowired
    private RemoteClient remoteClient; // mocked in BadTestConfig

    @Test
    @SneakyThrows
    public void testThatMySlowServiceIsCalledTheBadWay() {
        jmsTemplate.send(session -> session.createTextMessage("hello!"));
        Thread.sleep(5000L);
        verify(remoteClient, times(1)).remoteCall("hello!");
    }
}

Now, what's wrong with that?

Well, if there is just one test like this in your whole codebase you might not mind waiting those few extra seconds if the MessageListener is actually faster than 5 seconds. But how about if you have 15 tests like this? Or 200?

Also, if the code inside the MessageListener gets refactored and now returns in 15ms your test will still sit there for 5 seconds doing absolutely nothing, making the build needlessly slow.

Another drawback is, that codebases are usually built, tested and integrated on build servers such as jenkins or bamboo. If the build server is very busy or the database is slow, your build might fail unnecessarily and seemingly randomly because of too much load on one of those systems.

Ok, so how to do it better?

We will use three concepts to help us build more robust and faster tests:
  • AOP
  • CountDownLatch
  • A local JMS Broker

First, we implement an AOP Aspect (i will call it JmsListenerJoiner), that will wrap around any JMS MessageListener. It will use a CountDownLatch inside the Aspect to allow callers to block until the expected number of invocations has occurred. In our example, this number is hard-coded to 1, but it can easily be exposed to allow customization.

Then we will use a local ActiveMQ BrokerService, so that we don't have to write to a remote JMS server where other clients could steal our JMS Messages.

Implementing the AOP Aspect

Caveats

JmsListenerJoiner uses state to achieve its functionality, so if you want to use it on multiple JMS Listeners you need to tweak the @Around expression. Also, since it uses dynamic subclassing it can only be used to wrap non-final JMS Listener classes.

"Talk is cheap, show me the code!" i hear you say, so here we go:

Implementation

@Aspect
@NotThreadSafe
public class JmsListenerJoiner implements Ordered {
    private CountDownLatch countDown;

    public JmsListenerJoiner() {
        reinitialize(1);
    }

    @Around("!within(is(FinalType)) && @annotation(org.springframework.jms.annotation.JmsListener)")
    public void decorateAnnotatedListener(final ProceedingJoinPoint pjp) {
        doCountdown(pjp);
    }

    @Around("!within(is(FinalType)) && within(javax.jms.MessageListener+) execution(* onMessage(..)) && args(javax.jms.Message)")
    public void decorateMessageListener(final ProceedingJoinPoint pjp) {
        doCountdown(pjp);
    }

    @SneakyThrows
    private void doCountdown(final ProceedingJoinPoint pjp) {
        pjp.proceed();
        countDown.countDown();
    }

    @SneakyThrows
    public void await() {
        countDown.await();
        reinitialize(1);
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }

    private void reinitialize(final int count) {
        countDown = new CountDownLatch(count);
    }
}

Using an ActiveMQ BrokerService instance for testing

See below the JMS Spring Configuration with explanatory comments:
@Slf4j
@Configuration
public class Config {

    // this is the BrokerService. it is only started when the INTEGRATION_TEST profile is active.
    // In production environments this is not needed and there will be a separate server providing the Broker Service.
    @SneakyThrows
    @Bean(name = Conf.BROKER_NAME)
    @Profile(Profiles.INTEGRATION_TEST)
    public BrokerService brokerService() {
        final BrokerService broker = new BrokerService();
        broker.setPersistent(false);
        broker.addConnector(Conf.activeMqUri());
        return broker;
    }

    // we use a separate ConnectionFactory for testing because the brokerUrl needs to 
    // point at our local MQ broker and it needs to depend on the BrokerService, so that it is started
    // before the ConnectionFactory tries to use it
    @Bean
    @DependsOn(Conf.BROKER_NAME)
    @Profile(Profiles.INTEGRATION_TEST)
    public ConnectionFactory testConnectionFactory() {
        return new ActiveMQConnectionFactory(Conf.activeMqUri());
    }

    // this is the production ConnectionFactory. note the "!" before the Profile name.
    // usually the broker url passed in the constructor would be set to your production JMS Broker's uri.
    @Bean
    @Profile("!" + Profiles.INTEGRATION_TEST)
    public ConnectionFactory productionConnectionFactory() {
        return new ActiveMQConnectionFactory(Conf.activeMqUri());
    }

    // more code omitted for brevity.
}

And the TestConfig that will enable the JmsListenerJoiner and replace some RemoteClient that we will be testing for with a mock.
@Configuration
@EnableAspectJAutoProxy
public class GoodTestConfig {

    @Bean
    public JmsListenerJoiner jmsListenerJoiner() {
        return new JmsListenerJoiner();
    }

    @Bean
    @Primary
    public RemoteClient testRemoteClient() {
        return mock(RemoteClient.class);
    }
}

Putting it together

With the JmsListenerJoiner Aspect in place we can just inject it into the test and call its await() method.

As soon as the JMS Message is sent to the JMS Queue
  • The actual JMS Listener will pick up the Message (that is, the proxy class generated by the Aspect)
  • The call to await() will block until the MessageListener's execution (one execution) is finished
  • The test will only ever take as long as the MessageListener's execution takes or die with an exception after 30 seconds
  • @RunWith(SpringRunner.class)
    @SpringBootTest
    @ActiveProfiles(Config.Profiles.INTEGRATION_TEST)
    @ContextConfiguration(classes = {Config.class, GoodTestConfig.class})
    @DirtiesContext
    public class GoodTest {
        @Autowired
        private JmsTemplate jmsTemplate;
        @Autowired
        private JmsListenerJoiner jmsListenerJoiner;
        @Autowired
        private RemoteClient remoteClient;  // mocked in GoodTestConfig
    
        @Test(timeout = 30000L) // this is just a safety measure in case the MessageListener never gets called.
        @SneakyThrows
        public void testThatMySlowServiceIsCalledTheGoodWay() {
            jmsTemplate.send(session -> session.createTextMessage("hello!"));
            jmsListenerJoiner.await();
            verify(remoteClient, times(1)).remoteCall("hello!");
        }
    }
    

    Summary and source code repository

    This approach has some very convincing advantages over the first example in this article:
    • The test will never take longer than the absolute minimum time the listener's execution takes
    • When you make the listener's execution faster you make the test run faster
    • This approach is much less likely to fail if the build server, a remote service or a database are under load and react unexpectedly slowly
    • JmsListenerJoiner can be customized to wait for any number of consumed messages
    • With a local BrokerService your test does not depend on an outside JMS Server where other clients could steal your Messages
    • Despite being relatively easy to implement, the solution does not interfere with production code through the awesomeness that is AOP


    Since a picture says more than a thousand words:



    Take a peek at our GitHub repository to check out the full runnable example used in this blog post.