下面来自百度百科
HornetQ 是一个支持 集群 和多种协议,可嵌入、高性能的异步消息系统。 HornetQ 完全支持 JMS , HornetQ 不但支持 JMS1.1 API 同时也定义属于自己的消息 API ,这可以最大限度的提升 HornetQ 的性能和灵活性。在不久的将来更多的协议将被 HornetQ 支持。 [1]
HornetQ 拥有超高的性能, HornetQ 在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然, HornetQ 的非持久化消息的性能会表现的更好!
HornetQ 完全使用 POJO ,纯 POJO 的设计让 HornetQ 可以尽可能少的依赖第三方的包。从设计模式来说, HornetQ 这样的设计入侵性也最小。 HornetQ 既可以独立运行,也可以与其它 Java 应用程序服务器集成使用。
HornetQ 拥有完善的错误处理机制, HornetQ 提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。
HornetQ 提供了灵活的集群功能,通过创建 HornetQ 集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。
HornetQ 拥有强大的管理功能。 HornetQ 提供了大量的管理 API 和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个 HA 环境中。
应用场景
首先 HornetQ 是一种消息服务中间件,高效可靠的消息传递机制进行平台无关的 数据 交流,并基于数据通信来进行 分布式系统 的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。 本文档主要涉及到 HornetQ 的 JMS 功能的使用, HornetQ 的 JMS 只是对 HornetQ 的一种封装,适配了 java 的 JMS 协议。
如何集成到项目
HornetQ 目前大致有三种方式: standalone , embedded , Integrated with JBoss as 。
我个人倾向于 standalone 方式,因为:
1) 可以有更多的资源供 HornetQ 单独使用
2) 管理的话只需要关注 HornetQ 这一个产品的问题就行,而无需引入其他的复杂度。
3) 原项目中也是把消息中间件作为一个单独的模块部署,对原来的流程可以做到无缝承接。
目前我只是关注了 HornetQ standalone 这一模式,其他的暂且没有 深入。
使用 HornetQ 服务端很简单,直接运行 % HornetQ _HOME%/bin 下的 bat/sh 就可以启动(优化问题暂时没有考虑)
客户端推荐用 HornetQ 的 client 和 Spring 做集成, spring 的配置文件内容大致如下所示:
<? xml version = "1.0" encoding = "UTF-8" ?>
< beans xmlns = "http://www.springframework.org/schema/beans"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation = "http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd" >
< bean id = "listener" class = "org.hornetq.jms.example.ExampleListener" />
< bean id = "listenerContainer" class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >
< property name = "connectionFactory" ref = "ConnectionFactory" />
< property name = "destination" ref = "/queue/exampleQueue" />
< property name = "messageListener" ref = "listener" />
</ bean >
< bean id = "queueTarget" class = "org.springframework.jndi.JndiObjectTargetSource" >
< property name = "jndiName" >
< value > queue/testQueue </ value >
</ property >
< property name = "jndiTemplate" >
< ref local = "jndiTemplate" />
</ property >
</ bean >
< bean id = "jndiTemplate" class = "org.springframework.jndi.JndiTemplate" >
< property name = "environment" >
< props >
< prop key = "java.naming.factory.initial" > org.jnp.interfaces.NamingContextFactory </ prop >
< prop key = "java.naming.provider.url" > jnp :// localhost :1099 </ prop >
< prop key = "java.naming.factory.url.pkgs" > org.jboss.naming:org.jnp.interfaces </ prop >
</ props >
</ property >
</ bean >
</ beans >
因为 HornetQ 的 client 主要是以 JNDI 和服务端进行连接,所以以上我们都是通过 Spring 提供的 JMS 模板类和 JNDI 模板类来对 HornetQ 的 client 进行配置与管理。
使用步骤
具体示例主要是以本地 main 方法为主,用 spring 来管理的话也很简单 .
首先加入 HornetQ 客户端必须使用到的 HornetQ 工程的 jar 包
除了 jboss-client.jar ,其他的都可以在 HornetQ 的下载包里找到, jboss-client.jar 需要单独的下载 JBoss AS ,我下载的是 JBoss AS7 , jboss-client.jar 的目录为 % JBoss AS7_HOME%/bin/client
JMS Queue
1) Queue Provider
public static void main(String[] args) throws Exception{
// 初始化 JNDI
Properties properties = new Properties();
properties.put( "java.naming.factory.initial" ,
"org.jnp.interfaces.NamingContextFactory" );
properties.put( "java.naming.factory.url.pkgs" ,
"org.jboss.naming:org.jnp.interfaces" );
properties.put( "java.naming.provider.url" , "jnp://localhost:1099" );
InitialContext ic = new InitialContext(properties);
// 建立 ConnectionFactory
ConnectionFactory cf = (ConnectionFactory) ic
.lookup( "/ConnectionFactory" );
// 建立到 Queue 连接
Queue orderQueue = (Queue) ic.lookup( "queue/ExpiryQueue" );
// 通过 Queue 建立 Connection
Connection connection = cf.createConnection();
// 通过 Connection 建立 session
Session session = connection.createSession( false ,
Session. AUTO_ACKNOWLEDGE );
// 建立 JMS 生产者
MessageProducer producer = session.createProducer(orderQueue);
// 这一步必须,启动 connection
connection.start();
TextMessage message = session.createTextMessage( "First hornetq" );
producer.send(message);
System. out .println( "send success" );
2) Queue Consumer
// 初始化 JNDI
Properties properties = new Properties();
properties.put( "java.naming.factory.initial" ,
"org.jnp.interfaces.NamingContextFactory" );
properties.put( "java.naming.factory.url.pkgs" ,
"org.jboss.naming:org.jnp.interfaces" );
properties.put( "java.naming.provider.url" , "jnp://localhost:1099" );
InitialContext ic = new InitialContext(properties);
ConnectionFactory cf = (ConnectionFactory) ic
.lookup( "/ConnectionFactory" );
Queue orderQueue = (Queue) ic.lookup( "queue/ExpiryQueue" );
Connection connection = cf.createConnection();
Session session = connection.createSession( false ,
Session. AUTO_ACKNOWLEDGE );
MessageConsumer consumer = session.createConsumer(orderQueue);
connection.start();
上面建立连接的部分的注释参考 Queue Provider
/* Message message = consumer.receive();*/
consumer.setMessageListener( new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage)message;
String text;
try {
text = textMessage.getText();
System. out .println( "Get Text message" + text);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
System. out .println( "Get message" + message);
}
}
});
Thread. sleep (30000);
以上可以看到 Consumer 有二种方式
一种是调用 receive ,这样会阻塞,直到有消息为止,第二种是注册一个回调函数,实现 MessageListener 接口,这一种是异步的。
1) Topic Provider
public static void main(String[] args) throws Exception{
Properties properties = new Properties();
properties.put( "java.naming.factory.initial" ,
"org.jnp.interfaces.NamingContextFactory" );
properties.put( "java.naming.factory.url.pkgs" ,
"org.jboss.naming:org.jnp.interfaces" );
properties.put( "java.naming.provider.url" , "jnp://localhost:1099" );
InitialContext initialContext = new InitialContext(properties);
// Step 2. perform a lookup on the topic
// Topic topic = (Topic)initialContext.lookup("/topic/exampleTopic");
// Step 3. perform a lookup on the Connection Factory
ConnectionFactory cf = (ConnectionFactory)initialContext.lookup( "/ConnectionFactory" );
// Step 4. Create a JMS Connection
Connection connection = cf.createConnection();
// Step 11. Start the Connection
connection.start();
// Step 5. Create a JMS Session
Session session = connection.createSession( false , Session. AUTO_ACKNOWLEDGE );
// Topic topic = session.l("/topic/exampleTopic");
Topic topic = (Topic)initialContext.lookup( "/topic/exampleTopic2" );
// Step 6. Create a Message Producer
MessageProducer producer = session.createProducer(topic);
// Step 9. Create a Text Message
TextMessage message = session.createTextMessage( "This is a text message" );
System. out .println( "Sent message: " + message.getText());
// Step 10. Send the Message
producer.send(message);
System. out .println( "Topic send success" );
}
Topic 的 Provider 和 Queue 的 Provider 基本类似,只是一个是获得 Queue ,另外一个是获得 Topic
2) Topic Consumer
public static void main(String[] args) throws Exception{
Properties properties = new Properties();
properties .put( "java.naming.factory.initial" ,
"org.jnp.interfaces.NamingContextFactory" );
properties .put( "java.naming.factory.url.pkgs" ,
"org.jboss.naming:org.jnp.interfaces" );
properties .put( "java.naming.provider.url" , "jnp://localhost:1099" );
InitialContext initialContext = new InitialContext( properties );
// Step 2. perform a lookup on the topic
Topic topic = (Topic)initialContext.lookup( "/topic/exampleTopic2" );
// Step 3. perform a lookup on the Connection Factory
ConnectionFactory cf = (ConnectionFactory)initialContext.lookup( "/ConnectionFactory" );
// Step 4. Create a JMS Connection
Connection connection = cf.createConnection();
// Step 5. Create a JMS Session
Session session = connection.createSession( false , Session. AUTO_ACKNOWLEDGE );
// Step 7. Create a JMS Message Consumer
MessageConsumer messageConsumer1 = session.createConsumer(topic);
// Step 8. Create a JMS Message Consumer
MessageConsumer messageConsumer2 = session.createConsumer(topic);
// Step 11. Start the Connection
connection.start();
// Step 12. Receive the message
TextMessage messageReceived = (TextMessage)messageConsumer1.receive();
System. out .println( "Consumer 1 Received message: " + messageReceived.getText());
// Step 13. Receive the message
messageReceived = (TextMessage)messageConsumer2.receive();
System. out .println( "Consumer 2 Received message: " + messageReceived.getText());
}
Topic 模式的测试必须是 consumer 先启动,然后 provider 再启动, consumer 才能获得消息。这是由于 Topic 的特性而决定的。
后记:到目前为止,我还没有找到方法像 ActiveMQ 那样动态的创建 Queue 或者 Topic 的, ActiveMQ 中,如果向服务端发送请求,如果服务端没有这个 Queue 或者 Topic ,那么服务端会自动创建一个,但是 HornetQ 中没有这个功能,必须在配置文件中配置想要的 Queue 或者 Topic , HornetQ 服务端会热加载配置文件。
如果直接启动的话, HornetQ 默认加载 %HornetQ_HOME%\config\stand-alone\non-clustered\ hornetq-jms.xml
具体配置如下:
<queue name="DLQ">
<entry name="/queue/DLQ"/>
</queue>
<queue name="ExpiryQueue">
<entry name="/queue/ExpiryQueue"/>
</queue>
<topic name="exampleTopic">
<entry name="/topic/exampleTopic"/>
</topic>
<topic name="exampleTopic2">
<entry name="/topic/exampleTopic2"/>
</topic>
HornetQ 本身并没有提供想 ActiveMQ 那样的网页管理界面,它必须和 JBoss asapplication server 集成,或者自己写程序调用它提供的接口,但这样无形提高了工作量。
有个简单变通的方法,因为 HornetQ 里面集成了 JMX ,所以可以通过 jdk 的工具 jconsole 来查看里面的一些信息,和操作里面的一些属性,达到管理的目的