2022年activeMQ,JMS学习资料 .pdf
1 JMS JMS 源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。 Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得 Java程序能够和其他消息组件进行通信。1.1 JMS的基本构件1.1.1 连接工厂连接工厂是客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory。1.1.2 连接JMS Connection 封装了客户与 JMS 提供者之间的一个虚拟的连接。1.1.3 会话JMS Session是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer )、消息消费者( consumer)和消息( message )等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。1.1.4 目的地目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种消息传递域:点对点( PTP )消息传递域和发布/订阅消息传递域。点对点消息传递域的特点如下:每个消息只能有一个消费者。消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 1 页,共 27 页 - - - - - - - - - 息的时候是否处于运行状态,它都可以提取消息。发布/ 订阅消息传递域的特点如下:每个消息可以有多个消费者。生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS 规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。在点对点消息传递域中,目的地被成为队列(queue) ; 在发布 / 订阅消息传递域中, 目的地被成为主题(topic ) 。1.1.5 消息生产者消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。1.1.6 消息消费者消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种方法之一:同步消费。通过调用消费者的receive 方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。1.1.7 消息JMS 消息由以下三部分组成:消息头。每个消息头字段都有相应的getter 和setter 方法。消息属性。如果需要除消息头字段以外的值,那么可以使用消息属性。消息体。 JMS 定义的消息类型有 TextMessage、MapMessage 、BytesMessage、StreamMessage 和ObjectMessage。名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 2 页,共 27 页 - - - - - - - - - 1.2 JMS的可靠性机制1.2.1 消息确认JMS 消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:Session.AUTO_ACKNOWLEDGE。当客户成功的从 receive 方法返回的时候,或者从MessageListener.onMessage 方法成功返回的时候, 会话自动确认客户收到的消息。Session.CLIENT_ACKNOWLEDGE。客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第 5个消息,那么所有 10个消息都被确认。Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝第确认消息的提交。如果JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider 必须把消息头的 JMSRedelivered 字段设置为 true 。1.2.2 持久性JMS 支持以下两种消息提交模式:PERSISTENT。指JMS provider 持久保存消息,以保证消息不会因为JMS provider的失败而丢失。NON_PERSISTENT。不要求 JMS provider 持久保存消息。1.2.3 优先级可以使用消息优先级来指示JMS provider 首先提交紧急的消息。 优先级分 10个级别,从0(最低)到 9(最高)。如果不指定优先级,默认级别是4。需要注意的是, JMS 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 3 页,共 27 页 - - - - - - - - - provider 并不一定保证按照优先级的顺序提交消息。1.2.4 消息过期可以设置消息在一定时间后过期,默认是永不过期。1.2.5 本地事务在一个 JMS 客户端,可以使用本地事务来组合消息的发送和接收。JMS Session接口提供了 commit和rollback方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。事务性的会话总是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。 需要注意的是, 如果使用请求 /回复机制,即发送一个消息,同时希望在同一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发送操作才会真正执行。需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。2 ActiveMQ ActiveMQ 是 apache 旗下开源消息中间件,是目前最流行的开源的消息中间件。ActiveMQ 功能特点:支持跨语言的客户端,如:Java,C和 C + +,C,Ruby,Perl ,Python 和 PHP 。在 JMS客户端和消息代理都全面支持企业集成模式。支持许多高级功能,如:信息组,虚拟目的地,通配符和复合目的地。完全支持 JMS 1.1 和 J2EE1.4 。支持 Spring , 以便 ActiveMQ可以很容易地嵌入到Spring 应用程序和使用 Spring的 XML配置机制。通过了常见 J2EE服务器 (如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试。支持可插拔传输协议,如:in-VM,TCP, SSL, NIO, UDP, multicast, JGroups and JXTA 传输。名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 4 页,共 27 页 - - - - - - - - - 支持通过 JDBC和 journal提供高速的消息持久化。设计基于高性能集群,客户端服务器,对等通信。Ajax 支持网络流媒体,支持使用纯DHTML 的 Web浏览器,允许Web浏览器成为消息传递结构的一部分。支持 CXF和 Axis , 以便 ActiveMQ 可以很容易地进入这些Web服务并提供可靠的消息传递。可作为一个在内存中的JMS提供者,是 JMS的单元测试的理想选择。2.1 安装 ActiveMQ 可以从官方网站(http:/activemq.apache.org/)下载最新版的ActiveMQ。(最新版的为: 5.4.2 )2.2 启动 ActiveMQ 在下载最新的ActiveMQ 将其解压到相应的目录就可以了。需要启动ActiveMQ 只要找到 $activemq.home/bin目录,双击运行activemq.bat就可以了 (windows 版本 ) 。名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 5 页,共 27 页 - - - - - - - - - 启动后的界面如上图。 (ActiveMQ5.3.0版本启动后的截图,不同版本会有所不同)Listening for connections at: tcp:/10.10.40.174:61616(这里的 IP 显示的是机器名称)tcp:/10.10.40.174:61616表示监听的端口地址,也就是编写程序时,获取连接进用到的URL 。ActiveMQ Console at http:/10.10.40.174:8161/admin http:/10.10.40.174:8161/admin这个是新版本的activeMQ 提供的管理工具访问地址,可以查看队列详情,生产者,消费者等信息。2.3 配置 ActiveMQ 2.3.1 基本配置说明ActiveMQ 默认使用的是XML格式配置, 配置文件在 $activemq.home/conf目录下, 文件名为 activemq.xml 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 6 页,共 27 页 - - - - - - - - - file:$activemq.base/conf/credentials.properties !- The element is used to configure the ActiveMQ broker. - producerFlowControl=true memoryLimit=1mb producerFlowControl=true memoryLimit=1mb !- Use VM cursor for better latency 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 8 页,共 27 页 - - - - - - - - - For more information, see: http:/activemq.apache.org/message-cursors.html - 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 9 页,共 27 页 - - - - - - - - - !- The systemUsage controls the maximum amount of space the broker will use before slowing down producers. For more information, see: http:/activemq.apache.org/producer-flow-control.html 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 10 页,共 27 页 - - - - - - - - - - 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 11 页,共 27 页 - - - - - - - - - 2.3.1.1Kahadb 配置说明配置示例: KahaDB的各个可配置属性:属性默认值描述directoryactivemq-data保存 message store数据文件的目录indexWriteBatchSize1000批量更新索引的阀值,当要更新的索引到达这个索引时,批量更新到 metadata store 中indexCacheSize10000指定 metadata cache 的大小enableIndexWriteAsyncfalse写入索引文件到metadata store中的方式是否采用异步写入journalMaxFileLength32mb消息持久数据文件的大小enableJournalDiskSyncstrue如果为 true ,保证使用同步写入的方式持久化消息到journal 文件中cleanupInterval30000清除(清除或归档)不再使用的journal 文件的时间周期 (毫秒)。checkpointInterval5000写入索引信息到metadata store中的时间周期(毫秒)ignoreMissingJournalfilesfalse是否忽略丢失的journal 文件。如果为 false, 当丢失了 journal 文件时, broker 启动时会抛异常并关闭checkForCorruptJournalFilesfalse如果为 true , broker 在启动的时候会检测 journal 文件是否损坏,若损坏便尝试恢复它。checksumJournalFilesfalse如果为 true 。KahaDB 为 journal文件生产一个checksum,以便能够检测 journal 文件是否损坏。archiveDataLogsfalse如果为true,当达到cleanupInterval周期时,会归档journal 文件而不是删除directoryArchivenull指定归档journal文件存放的路径databaseLockedWaitDelay10000在使用主从数据库备份时,等待获取 DB 上的 lock 的延迟时间。maxAsyncJobs10000等待写入journal文件的任务队列的最大数量。应该大于或等于最大并发 producer 的数量。 配合名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 13 页,共 27 页 - - - - - - - - - 并行存储转发属性使用。concurrentStoreAndDispatchTransactionsfalse如果为 true ,转发消息的时候同时提交事务concurrentStoreAndDispatchTopicsfalse如果为 true , 转发 Topic 消息的时候同时存储消息的message store中。concurrentStoreAndDispatchQueuestrue如果为 true ,转发 Queue 消息的时候同时存储消息到message store 中。2.3.2 安全性配置说明(以 5.3.0 的版本为例,各个版本配置会有所不同,需查阅相应资料)访问 activeMQ 权限配置是在 $activemq.home/conf目录下的 activemq.xml中配置的, 默认是没有进行配置,任何用户都是可以访问activeMQ 。如果需要对访问权限进行控制可以进行相应配置,可以指定到相应队列的访问权限。1.配置 activemq.xml 配置权限需要在,activemq.xml的元素中加入以下部分。 /queue= 对应的是所有队列/ read= ” admins”表示 admins 组的所有用户都可以消费前面queue 属性指定的队列的消息/ write=admins 表示 admins 组的所有用户都可以向前面queue 属性指定的队列发送消息/ admin=admins 表示 admins 组的所有用户可以在前面queue 属性指定的队列不存在的情况创建队列 read=admins write=admins admin=admins / / queue=TEST.表示队列名称所有以” TEST. ” (包含“ .”)开头的队列 ,(这里建议队列名称以“.”作分隔符 ) read=users write=users admin=users / read=users write=users admin=users / 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 14 页,共 27 页 - - - - - - - - - / write=guests,users admin=guests,users 可以同时指定多个组 read=guests write=guests,users admin=guests,users / / 主题相关配置,与队列配置类似 read=admins write=admins admin=admins / read=users write=users admin=users / read=guests write=guests,users admin=guests,users / read=guests,users write=guests,users admin=guests,users/ 2.用户配置在$activemq.home/conf目录下增加login.config、groups.properties 、users.properties 三个文件。login.config 内容activemq-domain org.apache.activemq.jaas.PropertiesLoginModule required debug=true org.apache.activemq.jaas.properties.user=users.properties org.apache.activemq.jaas.properties.group=groups.properties; ; org.apache.activemq.jaas.properties.user=users.properties指定用户对应的配置文件org.apache.activemq.jaas.properties.group=groups.properties;指定用户组对应的配置文件groups.properties 内容admins=system users=zengjun,ll 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 15 页,共 27 页 - - - - - - - - - guests=guest users=zengjun,ll users 表示用户组名,与activemq.xml安全性配置中的read,write,admin属性值对应。zengjun,ll 表示两个用户名zengjun 和 ll,这里表示users 组下有两个用户zengjun 和 ll。users.properties 内容system=manager zengjun=zj ll=ll zengjun=zj zengjun 表示用户名,与groups.properties 配置用户名对应zj 表示等号左边用户对应的密码。注:在配置安全性配置后,在写代码创建连接时需要加上对应的用户名与密码。如: Connection connection = connectionFactory.createConnection(zengjun,zj); 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 16 页,共 27 页 - - - - - - - - - 2.4 点对点域2.4.1 生产消息生产消息都步骤如上图的深色部分所示。首先需要从连接工厂中获取到连接,然后通过连接来创建会话,再通过会话来创建目的地,再用会话与目的地来创建生产者。需要发送的消息也是通过会话来创建的。最后通过生产者来发送消息。示例代码:/ 初使化连接工厂ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/10.10.40.174:61616); Connection connection = null; Session session = null; 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 17 页,共 27 页 - - - - - - - - - MessageProducer producer = null; Destination destination = null; / 创建连接connection = connectionFactory.createConnection(); / 创建会话session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); / 创目的地destination = session.createQueue(TEST_QUEUE_ZJ); / 创生产者producer = session.createProducer(destination); / 设置消息的持久模式producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); / 创建消息TextMessage message = session.createTextMessage(); / 设置消息属性 double d = Math.random(); message.setStringProperty(ID, String.valueOf(d); message.setText(tttt111); message.setText(tttt22222222); / 发送消息producer.send(message); 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 18 页,共 27 页 - - - - - - - - - 2.4.2 消费消息消费消息如上图深色部分所示。首先需要从连接工厂创建连接,然后再通过连接创建会话,然后通过会话创建目的地,再通过会话与目的地来创消费者,然后消费者调用接口来消费消息。代码示例:/ 初始连接工厂 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/10.10.40.174:61616); Session session = null; MessageConsumer consumer = null; Connection connection = null; Message message = null; Destination destination=null; 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 19 页,共 27 页 - - - - - - - - - / 创建连接connection = connectionFactory.createConnection(); connection.start(); / 创建会话,指定消息的确认模式,( 确认模式请参1.2.1) session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); / 创建目的地destination = session.createQueue(TEST_QUEUE_ZJ); / 创建消息者consumer = session.createConsumer(destination); / 消息者调用接收消息接口来接收消息。/ 无时间参数表示一直等待,直到收到消息。/ message = consumer.receive(); / 有时间参数表示指定时间后没有消息则结束时, 如果存在消息就在取完消息后结束message = consumer.receive(5 * 1000); / 如果没有收到消息,不会等待,立即往下执行,不建议使用/ message = consumer.receiveNoWait(); / 判断是否收到消息。if (message != null) /判断消息的类型if (message instanceof TextMessage) TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println(TEXT: + text); / 确认消息textMessage.acknowledge(); else if (message instanceof StreamMessage) StreamMessage streamMessage = (StreamMessage) message; String strId = streamMessage.getStringProperty(ID); System.out.println(streammessage ID: + strId); 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 20 页,共 27 页 - - - - - - - - - / 确认消息streamMessage.acknowledge(); else System.out.println(没有收到消息 ); 还可以采用监听的方式来消费消息。采用监听的方式, 需要实现MessageListener接口,创建消费者的方式与前面描消费消息的步骤一致, 在建创建好消费者后需要设置实现MessageListener接口的监听器, 当监听器监听到消费者对应的目的地上有消息时,会自动调用onMessage方法,在 onMessage方法中可以得到消息。代码示例:/ 实现 MessageListener接口public class AmqConsumerOnMessage implements MessageListener public static void main(String args) new AmqConsumerOnMessage().createConsumer(TEST_queuE); / 实现 onMessage 方法 public void onMessage(Message message) try if (message != null) if (message instanceof TextMessage) TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println(TEXT: + text); else if (message instanceof StreamMessage) StreamMessage streamMessage = (StreamMessage) message; String strId = streamMessage.getStringProperty(ID); System.out.println(streammessage ID: + strId); 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 21 页,共 27 页 - - - - - - - - - / 确认消息 message.acknowledge(); else System.out.println(没有收到消息); catch (JMSException e) e.printStackTrace(); / 创建消费者 public void createConsumer(String queue) MessageConsumer consumer = null; Session session = null; Connection connection = null; try ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/10.10.40.174:61616); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Destination objDestination = null; objDestination = session.createQueue(queue); consumer = session.createConsumer(objDestination); / 设置监听器,来监听消息 consumer.setMessageListener(this); catch (JMSException e) e.printStackTrace(); 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 22 页,共 27 页 - - - - - - - - - finally ConnectionUtil.closeAll(connection, session, consumer); 2.5 发布 / 订阅域2.5.1 发布消息发布消息与点对点域的生产消息类似。只是将队列换成了主题,将生产者换成发布者代码示例:/ 初始化连接工厂ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(tcp:/10.10.40.174:61616); TopicConnection connection = null; ActiveMQTopicSession session = null; ActiveMQTopicPublisher publisher = null; ActiveMQTopic topic = null; / 创建连接connection = connectionFactory.createTopicConnection(zengjun, zj); / 创建会话session = (ActiveMQTopicSession) connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); / 创建主题topic = (ActiveMQTopic) session.createTopic(TEST.topic.zj); / 创建发布者publisher = (ActiveMQTopicPublisher) session.createPublisher(topic); / 设置消息持久方式,如果要实现持久订阅,持久方法必须是DeliveryMode.PERSISTENT publisher.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 23 页,共 27 页 - - - - - - - - - TextMessage message = session.createTextMessage(); message.setText(hello topic); / 发布消息publisher.publish(message);2.5.2 订阅消息订阅消息也与点对点域的消费消息类似。订阅消息分为持久订阅与非持久订阅。1.非持久订阅非持久订阅只有在订阅者处于在线状态时才到收到发布者发布到其订阅主题的消息,当非持久订阅者处于离线状态时,发布者发布到其订阅主题的消息非持久订阅者将无法收到。非持久阅者断开连接时取消其订阅,在其重新连接时将重新订阅。代码示例:public class TopicListener implements MessageListener public static void main(String argv) throws Exception TopicListener l = new TopicListener(); l.createSub(); System.out.println(ttt); public void createSub() throws JMSException ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcp:/10.10.40.174:61616); / 创建连接 Connection connection = factory.createConnection(zengjun, zj); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic(TEST.topic.zj); / 创建非持久订阅者,就相当于点对点的消费者 MessageConsumer consumer = session.createConsumer(topic); / 设置监听器,通过onMessage方法来接收消息 consumer.setMessageListener(this); 名师资料总结 - - -精品资料欢迎下载 - - - - - - - - - - - - - - - - - - 名师精心整理 - - - - - - - 第 24 页,共 27 页 - - - - - - - - - connection.start(); public void onMessage(Message message) System.out.println(收到消息 ); try if (message