Spring 提供了一个 JmsTransactionManager 用于对 JMS ConnectionFactory 做事务管理。这将允许 JMS 应用利用 Spring 的事务管理特性。 JmsTransactionManager 在执行本地资源事务管理时将从指定的 ConnectionFactory 绑定一个 ConnectionFactory/Session 这样的配对到线程中。 JmsTemplate 会自动检测这样的事务资源,并对它们进行相应操作。
在 Java EE 环境中, ConnectionFactory 会池化 Connection 和 Session ,这样这些资源将会在整个事务中被有效地重复利用。在一个独立的环境中,使用 Spring 的 SingleConnectionFactory 时所有的事务将公用一个 Connection ,但是每个事务将保留自己独立的 Session 。
JmsTemplate 可以利用 JtaTransactionManager 和能够进行分布式的 JMS ConnectionFactory 处理分布式事务。
在 Spring 整合 JMS 的应用中,如果我们要进行本地的事务管理的话非常简单,只需要在定义对应的消息监听容器时指定其 sessionTransacted 属性为 true ,如:
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> <property name="sessionTransacted" value="true"/> </bean>
该属性值默认为 false ,这样 JMS 在进行消息监听的时候就会进行事务控制,当在接收消息时监听器执行失败时 JMS 就会对接收到的消息进行回滚,对于 SessionAwareMessageListener 在接收到消息后发送一个返回消息时也处于同一事务下,但是对于其他操作如数据库访问等将不属于该事务控制。
这里我们可以来做一个这样的测试:我们如上配置监听在 queueDestination 的消息监听容器的 sessionTransacted 属性为 true ,然后把我们前面提到的消息监听器 ConsumerMessageListener 改成这样:
public class ConsumerMessageListener implements MessageListener { public void onMessage(Message message) { //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage TextMessage textMsg = (TextMessage) message; System.out.println("接收到一个纯文本消息。"); try { System.out.println("消息内容是:" + textMsg.getText()); if (1 == 1) { throw new RuntimeException("Error"); } } catch (JMSException e) { e.printStackTrace(); } } }
我们可以看到在上述代码中我们的 ConsumerMessageListener 在进行消息接收的时候抛出了一个 RuntimeException ,根据我们上面说的,因为我们已经在对应的监听容器上定义了其 sessionTransacted 属性为 true ,所以当这里抛出异常的时候 JMS 将对接收到的消息进行回滚,即下次进行消息接收的时候该消息仍然能够被接收到。为了验证这一点,我们先执行一遍测试代码,往 queueDestination 发送一个文本消息,这个时候 ConsumerMessageListener 在进行接收的时候将会抛出一个 RuntimeException ,已经接收到的纯文本消息将进行回滚;接着我们去掉上面代码中抛出异常的语句,即 ConsumerMessageListener 能够正常的进行消息接收,这个时候我们再运行一次测试代码,往 ConsumerMessageListener 监听的 queueDestination 发送一条消息。如果之前在接手时抛出了异常的那条消息已经回滚了的话,那么这个时候将能够接收到两条消息,控制台将输出接收到的两条消息的内容。具体结果有兴趣的朋友可以自己验证一下。
如果想接收消息和数据库访问处于同一事务中,那么我们就可以配置一个外部的事务管理同时配置一个支持外部事务管理的消息监听容器(如 DefaultMessageListenerContainer )。要配置这样一个参与分布式事务管理的消息监听容器,我们可以配置一个 JtaTransactionManager ,当然底层的 JMS ConnectionFactory 需要能够支持分布式事务管理,并正确地注册我们的 JtaTransactionManager 。这样消息监听器进行消息接收和对应的数据库访问就会处于同一数据库控制下,当消息接收失败或数据库访问失败都会进行事务回滚操作。
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="consumerMessageListener" /> <property name="transactionManager" ref="jtaTransactionManager"/> </bean> <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
当给消息监听容器指定了 transactionManager 时,消息监听容器将忽略 sessionTransacted 的值。
关于使用 JtaTransactionManager 来管理上述分布式事务,我们这里也可以来做一个试验。
首先:往 Spring 配置文件 applicationContext.xml 中添加如下配置:
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="dataSource"/> </bean> <jee:jndi-lookup jndi-name="jdbc/mysql" id="dataSource"/> <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> <tx:annotation-driven transaction-manager="jtaTransactionManager"/>
我们可以看到,在这里我们引入了一个 jndi 数据源,定义了一个 JtaTransactionManager ,定义了 Spring 基于注解的声明式事务管理,定义了一个 Spring 提供的进行 Jdbc 操作的工具类 jdbcTemplate 。
接下来把我们的 ConsumerMessageListener 改为如下形式:
public class ConsumerMessageListener implements MessageListener { @Autowired private TestDao testDao; private int count = 0; public void onMessage(Message message) { //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage TextMessage textMsg = (TextMessage) message; System.out.println(new Date().toLocaleString() + "接收到一个纯文本消息。"); try { String text = textMsg.getText(); System.out.println("消息内容是:" + text); System.out.println("当前count的值是:" + count); testDao.insert(text + count); if (count == 0) { count ++; throw new RuntimeException("Error! 出错啦!"); } } catch (JMSException e) { e.printStackTrace(); } } }
我们可以看到,在 ConsumerMessageListener 中我们定义了一个实例变量 count ,其初始值为 0 ;在 onMessage 里面,我们可以看到我们把接收到的消息内容作为参数调用了 testDao 的 insert 方法;当 count 值为 0 ,也就是进行第一次消息接收的时候会将 count 的值加 1 ,同时抛出一个运行时异常。那么我们这里要测试的就是进行第一次接收的时候 testDao 已经把相关内容插入数据库了,接着在 onMessage 里面抛出了一个异常同时 count 加 1 ,我们预期的结果应该是此时数据库进行回滚,同时 JMS 也回滚,这样 JMS 将继续尝试接收该消息,此时同样会调用 testDao 的 insert 方法将内容插入数据库,再接着 count 已经不为 0 了,所以此时将不再抛出异常, JMS 成功进行消息的接收, testDao 也成功的将消息内容插入到了数据库。要证明这个预期我们除了看数据库中插入的数据外,还可以看控制台的输出,正常情况控制台将输出两次消息接收的内容,且第一次时 count 为 0 ,第二次 count 为 1 。
TestDao 是一个接口,其 TestDaoImpl 对 insert 的方法实现如下:
@Transactional(readOnly=false) public void insert(final String name) { jdbcTemplate.update("insert into test(name) values(?)", name); }
这里我们使用支持 JtaTransactionManager 的 Weblogic 来进行测试,因为是 Web 容器,所以我们这里定义了一个 Controller 来进行消息的发送,具体代码如下:
@Controller @RequestMapping("test") public class TestController { @Autowired @Qualifier("queueDestination") private Destination destination; @Autowired private ProducerService producerService; @RequestMapping("first") public String first() { producerService.sendMessage(destination, "你好,现在是:" + new Date().toLocaleString()); return "/test/first"; } }
接下来就是启用 Weblogic 服务器,进入其控制台,定义一个名叫“ jdbc/mysql ”的 JNDI 数据源,然后把该项目部署到 Weblogic 服务器上并进行启动。接下来我们就可以访问 /test/first.do 访问到上述 first 方法。之后控制台会输出如下信息:
我们可以看到当
count
为
0
时接收了一次,并随后抛出了异常,之后
count
为
1
又接收了一次,这说明在
count
为
0
时抛出异常后我们的
JMS
进行回滚了,那么我们的数据库是否有进行回滚呢?接着我们来看数据库中的内容:
我们可以看到数据库表中只有一条记录,而且最后一位表示
count
的值的为
1
,这说明在
JMS
进行消息接收抛出异常时我们的数据库也回滚了。关于使用
JtaTransactionManager
进行分布式事务管理的问题就说到这里了,有兴趣的朋友可以自己试验一下。