I was talking recently with an MQ customer who said they were considering a solution that used Spring Boot for an MQ application that would move messages reliably from one queue manager to another, doing some processing on the way. “Can we do that?” they asked. “Of course” was my reply. But naturally I had to then try it out myself.
To implement the idea, I had to dig into two aspects. I’d consider these as reasonably advanced features of Spring Boot. One part was being able to configure more than one queue manager’s connection in the resource properties file. The other was working with global JTA/XA transactions.
The sample programs
What I’ve ended up with is a pair of sample programs that both implement the first part, but have different flow patterns, and hence different ways of managing the XA transactions. The two samples are in the s4.jms3 and s4a.jms3 subdirectories of the repository. I’ll refer to them as s4
and s4a
in places, for simplicity.
Multiple Connection Definitions
The MQ Spring Boot starter, like other starters, makes it easy to configure the managed resource. In this case, connections to queue managers. Attributes like the channel name can be set externally through environment variables, properties files, yaml files etc. The Spring Boot libraries convert the different external formats into a common set of Java properties that can be used, in our case, to build a ConnectionFactory (CF).
But that simplicity comes from being able to make certain assumptions such as there only being a single definition in the properties. You set various things under the ibm.mq
tree prefix, such as a userid, and Spring drives the auto-configuration to create the necessary Beans. No user-written code is necessary for this creation. But if you want to work with more than one queue manager from the same configuration file, in the same process, then it’s not quite so simple.
Instead, we have to tell Spring a bit more about the ConnectionFactories we need.
Multiple CF classes
In these samples I know that the applications are going to connect to two different queue managers. So I’ve written two classes that nominate the configuration attributes to be pulled in.
Looking at QM1Config.java, we can see:
@Bean @ConfigurationProperties("qm1") MQConfigurationProperties qm1ConfigProperties() { return new MQConfigurationProperties(); }
That makes Spring parse the MQConfiguration attributes that are understood by the MQ Boot Starter to create a CF, but to use qm1
instead of ibm.mq
as the starting point. In the application.yml
file you can see the required details to reach this queue manager:
qm1: queueManager: "QM1" channel: "SYSTEM.DEF.SVRCONN" connName: "localhost(1414)"
The other part of the QM1Config class shows how to get a CF that is based on these properties. The two samples do slightly different things with the CF creation because of the different transaction flow model. But that will show up in the next section.
The QM2Config class is essentially the same, but referring to the other prefix for attributes.
When the samples run with DEBUG logging, we can see both sets of configuration being used:
c.i.m.s.b.MQConfigurationProperties - constructor c.i.m.s.b.MQConnectionFactoryFactory - constructor c.i.m.s.b.MQConnectionFactoryFactory - createConnectionFactory for class MQXAConnectionFactory c.i.m.s.b.MQConnectionFactoryFactory - configuring TLS Store system properties c.i.m.s.b.MQConnectionFactoryFactory - createConnectionFactoryInstance for class MQXAConnectionFactory c.i.m.s.b.MQConnectionFactoryFactory - configureConnectionFactory c.i.m.s.b.MQConfigurationProperties - queueManager : QM1 c.i.m.s.b.MQConfigurationProperties - applicationName : null .... c.i.m.s.b.MQConfigurationProperties - constructor c.i.m.s.b.MQConnectionFactoryFactory - constructor c.i.m.s.b.MQConnectionFactoryFactory - createConnectionFactory for class MQXAConnectionFactory c.i.m.s.b.MQConnectionFactoryFactory - configuring TLS Store system properties c.i.m.s.b.MQConnectionFactoryFactory - createConnectionFactoryInstance for class MQXAConnectionFactory c.i.m.s.b.MQConnectionFactoryFactory - configureConnectionFactory c.i.m.s.b.MQConfigurationProperties - queueManager : Q2 c.i.m.s.b.MQConfigurationProperties - applicationName : null ....
IDE Validation
One thing I noticed when editing the programs and configuration file in Eclipse, was a set of warnings. The Eclipse editors know about valid attributes in the Spring environment, and said that “qm1” and “qm2” were not valid elements in the application.yml file. We just have to live with those warnings or tell Eclipse to ignore them for this project.
Adding more queue managers to the application flow
There are approaches to permit definition of an arbitrary set of queue manager configurations, using something like a list structure in the YAML file. That would mean we don’t have to create an explicit number of classes to match the number of queue managers for this application. But I felt that was going too far, and even more advanced than was needed for these samples.
Global Transactions
The basic flow in both of these samples is to receive a message from a queue on one queue manager, do some processing, and then send it to another queue on a different queue manager. It must be reliable – capable of dealing with errors at any point. We don’t want to lose or duplicate messages during that flow. And that implies the use of a global transaction – also known as two-phase, XA or (as we’re in a Java world) a JTA transaction.
A JEE environment, such as the Liberty application server, includes a JTA coordinator capability. In a standalone Spring environment, we have to include a coordinator as part of the application. There are several possible choices, but I picked Atomikos as the coordinator. It’s one system I’ve previously seen people wanting to use with MQ, so it seemed a good selection.
Atomikos
Adding Atomikos is just a matter of adding a Spring Boot starter component to the build definition. This is how it’s done in build.gradle:
dependencies { implementation(group:"com.ibm.mq", name:"mq-jms-spring-boot-starter", version:starterVersion) implementation(group:"com.atomikos", name:"transactions-spring-boot3-starter", version:"6.0.0") }
Note that this Atomikos library is not part of the Spring-provided components (it used to be) but comes direct from the company that also offers commercial offerings around the technology. For our purposes, we can use the “free” package in this sample. Though it will always print out a message about being unregistered unless you follow the instructions to register it.
Two samples
The two sample programs have different flows.
The first one, s4, is based around having a loop that does an MQGET(wait) followed by an MQPUT. Each loop starts a global transaction and then either commits or rolls back. Apart from using Spring to create the MQXAConnectionFactory objects, it basically uses standard JMS operations with none of the additional Spring simplifications. I chose not to use some of the Atomikos-provided Spring analogue capabilities such as its own JmsTemplate class. It’s easier to understand and control what’s happening with the basic JMS operations.
The second sample, s4a, uses the Spring JmsListener pattern: each message causes an asynchronous method invocation. That method then copies the message to the other queue manager. In this pattern, we do not explicitly create or resolve transactions; successful completion of the method causes a commit. Throwing an exception in the callback automatically causes the rollback.
In both cases, the samples exit after doing a rollback. That’s so we don’t get into an infinite loop of reprocessing the same message. In a real application, you would have a better approach to dealing with that – perhaps using the MQ JMS client’s capability of automatically sending messages to another queue. That uses the BOTHRESH and BOQNAME attributes of the queue you are reading from.
For the transactions to work, the Atomikos coordinator has to know about the CFs. We do that with the AtomikosConnectionFactoryBean.setXaConnectionFactory
call. The Spring Boot startup automatically initialises the overall Atomikos framework. The CFs themselves are built within the corresponding QMxConfig class. For s4a, we also need aJmsListenerContainerFactory
Bean for one of the queue manager connections.
At that point, we have everything ready.
Application testing
The RUNME.sh
scripts in each sample directory build and execute the programs. In each case, what we want to see at the end is one message on the original input queue with a non-zero backout count, and one message on the other queue manager’s queue. The output from the s4a sample looks like this:
======================================== MQ JMS XA Transaction Sample started. ======================================== Received message: COMMIT Message for Spring Fri 14 Jun 09:30:04 BST 2024 Executing: Commit Received message: ROLLBACK Message for Spring Fri 14 Jun 09:30:05 BST 2024 Executing: Rollback In Error Handler: [1] Listener method 'public void sample4a.Listener.receiveMessage(jakarta.jms.Session,jakarta.jms.TextMessage) throws jakarta.jms.JMSException,java.lang.RuntimeException' threw exception [2] Doing XA rollback Received message: ROLLBACK Message for Spring Fri 14 Jun 09:30:05 BST 2024 Exiting because delivery count indicates repeated delivery. Count: 2 Executing: Rollback In Error Handler: [1] Listener method 'public void sample4a.Listener.receiveMessage(jakarta.jms.Session,jakarta.jms.TextMessage) throws jakarta.jms.JMSException,java.lang.RuntimeException' threw exception [2] Doing XA rollback Stopping listener Done.
The final stage of the output can vary depending on timing and the precise state of the flow (we might see the rolledback message more than once). But it does eventually exit cleanly. And checking that messages have ended up how we want:
$ cat checkOutput echo "=== QM1 ===" amqsbcg DEV.QUEUE.1 QM1 | grep messages | grep -v No echo "=== QM2 ===" amqsbcg DEV.QUEUE.1 QM2 | grep messages | grep -v No $ ./checkOutput === QM1 === 1 messages browsed. === QM2 === 1 messages browsed.
Fully testing that transaction coordination works can be difficult without being able to do things like interrupt the coordinator at interesting places, such as half-way through the PREPARE sequence. I know that tests run for the MQ Java client and the queue manager to ensure reliability. So that’s not something I am trying to exercise here. But I do want to verify that the XA flows happen at all, just to ensure the Atomikos framework has properly registered the CFs.
The easiest way to do that is to take take a queue manager trace and see if XA function appear. You don’t really need to know the MQ internals to get that information from the trace output. Running grep -i xaopen /var/mqm/trace/*
gives the structure of the function names to look for. And so I search for zlaXA
functions:
---{ zlaXAOpen ---} zlaXAOpen rc=OK FunctionTime=59 ---{ zlaXAStart ---} zlaXAStart rc=OK FunctionTime=1047 ---{ zlaXAEnd ---} zlaXAEnd rc=OK FunctionTime=284 ---{ zlaXAPrepare ---} zlaXAPrepare rc=OK FunctionTime=335235 ---{ zlaXACommit ---} zlaXACommit rc=OK FunctionTime=109165 ---{ zlaXAStart ---} zlaXAStart rc=OK FunctionTime=756 ---{ zlaXAEnd ---} zlaXAEnd rc=OK FunctionTime=388 ---{ zlaXARollback ---} zlaXARollback rc=OK FunctionTime=351397
I can see here that the coordinator has indeed managed a two-phase transaction to this queue manager. The right operations have run in the right sequence to first commit a transaction and then roll back a second operation.
Summary
As I find so often with Spring applications, it can be hard to find documentation or samples on how to use some of these advanced features. Hopefully this will help.
One thought on “MQ Spring Boot: Advanced Configuration and Transactions”