HornetQ初体验

系统 1451 0
技术介绍

下面来自百度百科

HornetQ 是一个支持 集群 和多种协议,可嵌入、高性能的异步消息系统。 HornetQ 完全支持 JMS HornetQ 不但支持 JMS1.1 API 同时也定义属于自己的消息 API ,这可以最大限度的提升 HornetQ 的性能和灵活性。在不久的将来更多的协议将被 HornetQ 支持。 [1]

HornetQ 拥有超高的性能, HornetQ 在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然, HornetQ 的非持久化消息的性能会表现的更好!

HornetQ 完全使用 POJO ,纯 POJO 的设计让 HornetQ 可以尽可能少的依赖第三方的包。从设计模式来说, HornetQ 这样的设计入侵性也最小。 HornetQ 既可以独立运行,也可以与其它 Java 应用程序服务器集成使用。

HornetQ 拥有完善的错误处理机制, HornetQ 提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。

HornetQ 提供了灵活的集群功能,通过创建 HornetQ 集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。

HornetQ 拥有强大的管理功能。 HornetQ 提供了大量的管理 API 和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个 HA 环境中。

 

 

应用场景

首先 HornetQ 是一种消息服务中间件,高效可靠的消息传递机制进行平台无关的 数据 交流,并基于数据通信来进行 分布式系统 的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。 本文档主要涉及到 HornetQ JMS 功能的使用, HornetQ JMS 只是对 HornetQ 的一种封装,适配了 java JMS 协议。

 

如何集成到项目

HornetQ 目前大致有三种方式: standalone embedded Integrated   with JBoss as

我个人倾向于 standalone 方式,因为:

1)   可以有更多的资源供 HornetQ 单独使用

2)   管理的话只需要关注 HornetQ 这一个产品的问题就行,而无需引入其他的复杂度。

3)   原项目中也是把消息中间件作为一个单独的模块部署,对原来的流程可以做到无缝承接。

目前我只是关注了 HornetQ standalone 这一模式,其他的暂且没有 深入。

 

使用 HornetQ 服务端很简单,直接运行 % HornetQ _HOME%/bin 下的 bat/sh 就可以启动(优化问题暂时没有考虑)

 

客户端推荐用 HornetQ client Spring 做集成, spring 的配置文件内容大致如下所示:

<? xml version = "1.0" encoding = "UTF-8" ?>

< beans xmlns = "http://www.springframework.org/schema/beans"

       xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"

       xsi:schemaLocation = "http://www.springframework.org/schema/beans

           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd" >

 

    < bean id = "listener" class = "org.hornetq.jms.example.ExampleListener" />

   

 

    < bean id = "listenerContainer" class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >

        < property name = "connectionFactory" ref = "ConnectionFactory" />

        < property name = "destination" ref = "/queue/exampleQueue" />

        < property name = "messageListener" ref = "listener" />

    </ bean >

   

   

    < bean id = "queueTarget" class = "org.springframework.jndi.JndiObjectTargetSource" >

   

        < property name = "jndiName" >

   

            < value > queue/testQueue </ value >

   

        </ property >

   

        < property name = "jndiTemplate" >

   

            < ref local = "jndiTemplate" />

   

        </ property >

   

    </ bean >

   

    < bean id = "jndiTemplate" class = "org.springframework.jndi.JndiTemplate" >

   

        < property name = "environment" >

   

            < props >

   

                < prop key = "java.naming.factory.initial" > org.jnp.interfaces.NamingContextFactory </ prop >

   

                < prop key = "java.naming.provider.url" > jnp :// localhost :1099 </ prop >

   

                < prop key = "java.naming.factory.url.pkgs" > org.jboss.naming:org.jnp.interfaces </ prop >

   

            </ props >

   

        </ property >

 

</ bean >

</ beans >

 

因为 HornetQ client 主要是以 JNDI 和服务端进行连接,所以以上我们都是通过 Spring 提供的 JMS 模板类和 JNDI 模板类来对 HornetQ client 进行配置与管理。

 

使用步骤

   具体示例主要是以本地 main 方法为主,用 spring 来管理的话也很简单 .

首先加入 HornetQ 客户端必须使用到的 HornetQ 工程的 jar

 


HornetQ初体验
 

除了 jboss-client.jar ,其他的都可以在 HornetQ 的下载包里找到, jboss-client.jar 需要单独的下载 JBoss AS ,我下载的是 JBoss AS7 jboss-client.jar 的目录为 % JBoss AS7_HOME%/bin/client

  JMS Queue

1)        Queue Provider

 

public static void main(String[] args) throws Exception{

      

           // 初始化 JNDI

            Properties properties = new Properties();  

            properties.put( "java.naming.factory.initial" ,  

                    "org.jnp.interfaces.NamingContextFactory" );  

            properties.put( "java.naming.factory.url.pkgs" ,  

                    "org.jboss.naming:org.jnp.interfaces" );  

            properties.put( "java.naming.provider.url" , "jnp://localhost:1099" );  

            InitialContext ic = new InitialContext(properties);

           

            // 建立 ConnectionFactory

            ConnectionFactory cf = (ConnectionFactory) ic  

                    .lookup( "/ConnectionFactory" );

           

            // 建立到 Queue 连接

            Queue orderQueue = (Queue) ic.lookup( "queue/ExpiryQueue" );

           

            // 通过 Queue 建立 Connection

            Connection connection = cf.createConnection();  

           

            // 通过 Connection 建立 session

            Session session = connection.createSession( false ,  

                    Session. AUTO_ACKNOWLEDGE );

           

            // 建立 JMS 生产者

            MessageProducer producer = session.createProducer(orderQueue);  

            // 这一步必须,启动 connection

            connection.start();  

           

            TextMessage message =   session.createTextMessage( "First hornetq" );

            producer.send(message);

            System. out .println( "send success" );

 

 

2)   Queue Consumer

 

        // 初始化 JNDI

       Properties properties = new Properties();  

        properties.put( "java.naming.factory.initial" ,  

                "org.jnp.interfaces.NamingContextFactory" );  

        properties.put( "java.naming.factory.url.pkgs" ,  

                "org.jboss.naming:org.jnp.interfaces" );  

        properties.put( "java.naming.provider.url" , "jnp://localhost:1099" );  

        InitialContext ic = new InitialContext(properties);  

            ConnectionFactory cf = (ConnectionFactory) ic  

                    .lookup( "/ConnectionFactory" );  

            Queue orderQueue = (Queue) ic.lookup( "queue/ExpiryQueue" );  

            Connection connection = cf.createConnection();  

            Session session = connection.createSession( false ,  

                    Session. AUTO_ACKNOWLEDGE );  

            MessageConsumer consumer = session.createConsumer(orderQueue);  

            connection.start();  

           

上面建立连接的部分的注释参考 Queue Provider

 

/*           Message message =   consumer.receive();*/

           

            consumer.setMessageListener( new MessageListener() {

             

              @Override

              public void onMessage(Message message) {

 

                  if (message instanceof TextMessage) {

                   TextMessage textMessage =   (TextMessage)message;

                   String text;

                     try {

                         text = textMessage.getText();

                      System. out .println( "Get Text message" + text);

                     } catch (JMSException e) {

                         e.printStackTrace();

                     }

 

                  

                   } else {

                   System. out .println( "Get message" + message);

                  }

                 

             

                 

              }

           });

           

            Thread. sleep (30000);

 

以上可以看到 Consumer 有二种方式

一种是调用 receive ,这样会阻塞,直到有消息为止,第二种是注册一个回调函数,实现 MessageListener 接口,这一种是异步的。

 

 

1) Topic Provider

 

    public static void main(String[] args) throws Exception{

       Properties properties = new Properties();  

        properties.put( "java.naming.factory.initial" ,  

                "org.jnp.interfaces.NamingContextFactory" );  

        properties.put( "java.naming.factory.url.pkgs" ,  

                "org.jboss.naming:org.jnp.interfaces" );  

        properties.put( "java.naming.provider.url" , "jnp://localhost:1099" );  

        InitialContext initialContext = new InitialContext(properties);

        // Step 2. perform a lookup on the topic

//         Topic topic = (Topic)initialContext.lookup("/topic/exampleTopic");

 

        // Step 3. perform a lookup on the Connection Factory

        ConnectionFactory cf = (ConnectionFactory)initialContext.lookup( "/ConnectionFactory" );

 

        // Step 4. Create a JMS Connection

        Connection connection = cf.createConnection();

       

        // Step 11. Start the Connection

        connection.start();

 

        // Step 5. Create a JMS Session

        Session session = connection.createSession( false , Session. AUTO_ACKNOWLEDGE );

       

//         Topic topic =   session.l("/topic/exampleTopic");

        Topic topic =   (Topic)initialContext.lookup( "/topic/exampleTopic2" );

 

        // Step 6. Create a Message Producer

        MessageProducer producer = session.createProducer(topic);

 

        // Step 9. Create a Text Message

        TextMessage message = session.createTextMessage( "This is a text message" );

 

        System. out .println( "Sent message: " + message.getText());

 

        // Step 10. Send the Message

        producer.send(message);

       

        System. out .println( "Topic send success" );

 

 

     }

 

Topic Provider Queue Provider 基本类似,只是一个是获得 Queue ,另外一个是获得 Topic

 

2) Topic Consumer

 

    public static void main(String[] args) throws Exception{

       Properties properties = new Properties();  

        properties .put( "java.naming.factory.initial" ,  

                "org.jnp.interfaces.NamingContextFactory" );  

        properties .put( "java.naming.factory.url.pkgs" ,  

                "org.jboss.naming:org.jnp.interfaces" );  

        properties .put( "java.naming.provider.url" , "jnp://localhost:1099" );  

        InitialContext initialContext = new InitialContext( properties );

        // Step 2. perform a lookup on the topic

        Topic topic = (Topic)initialContext.lookup( "/topic/exampleTopic2" );

 

        // Step 3. perform a lookup on the Connection Factory

        ConnectionFactory cf = (ConnectionFactory)initialContext.lookup( "/ConnectionFactory" );

 

        // Step 4. Create a JMS Connection

        Connection connection = cf.createConnection();

 

        // Step 5. Create a JMS Session

        Session session = connection.createSession( false , Session. AUTO_ACKNOWLEDGE );

       

        // Step 7. Create a JMS Message Consumer

        MessageConsumer messageConsumer1 = session.createConsumer(topic);

 

        // Step 8. Create a JMS Message Consumer

        MessageConsumer messageConsumer2 = session.createConsumer(topic);

       

     // Step 11. Start the Connection

        connection.start();

       

        // Step 12. Receive the message

        TextMessage messageReceived = (TextMessage)messageConsumer1.receive();

 

        System. out .println( "Consumer 1 Received message: " + messageReceived.getText());

 

        // Step 13. Receive the message

        messageReceived = (TextMessage)messageConsumer2.receive();

 

        System. out .println( "Consumer 2 Received message: " + messageReceived.getText());

}

Topic 模式的测试必须是 consumer 先启动,然后 provider 再启动, consumer 才能获得消息。这是由于 Topic 的特性而决定的。

 

后记:到目前为止,我还没有找到方法像 ActiveMQ 那样动态的创建 Queue 或者 Topic 的, ActiveMQ 中,如果向服务端发送请求,如果服务端没有这个 Queue 或者 Topic ,那么服务端会自动创建一个,但是 HornetQ 中没有这个功能,必须在配置文件中配置想要的 Queue 或者 Topic HornetQ 服务端会热加载配置文件。

如果直接启动的话, HornetQ 默认加载 %HornetQ_HOME%\config\stand-alone\non-clustered\ hornetq-jms.xml

具体配置如下:

  <queue name="DLQ">

      <entry name="/queue/DLQ"/>

   </queue>

  

   <queue name="ExpiryQueue">

      <entry name="/queue/ExpiryQueue"/>

   </queue>

  

   <topic name="exampleTopic">

      <entry name="/topic/exampleTopic"/>

   </topic>

  

   <topic name="exampleTopic2">

      <entry name="/topic/exampleTopic2"/>

   </topic>

 

HornetQ 本身并没有提供想 ActiveMQ 那样的网页管理界面,它必须和 JBoss asapplication server 集成,或者自己写程序调用它提供的接口,但这样无形提高了工作量。

有个简单变通的方法,因为 HornetQ 里面集成了 JMX ,所以可以通过 jdk 的工具 jconsole 来查看里面的一些信息,和操作里面的一些属性,达到管理的目的


HornetQ初体验
 
HornetQ初体验
 
HornetQ初体验
 

 

 

HornetQ初体验


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论