本文共 10277 字,大约阅读时间需要 34 分钟。
通过上一篇文章 《消息队列深入解析》,我们已经消息队列是什么、使用消息队列的好处以及常见消息队列的简单介绍。
这一篇文章,主要带大家详细了解一下消息队列ActiveMQ的使用。
学习消息队列ActiveMQ的使用之前,我们先来搞清JMS。
JMS(JAVA Message Service,java消息服务)是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
参考:
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
安装过程很简单这里就不贴安装过程了,可以自行google.
添加Maven依赖
org.apache.activemq activemq-all 5.15.3
生产者发送消息测试方法:
@Test public void testQueueProducer() throws Exception { // 1、创建一个连接工厂对象,需要指定服务的ip及端口。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 2、使用工厂对象创建一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 3、开启连接,调用Connection对象的start方法。 connection.start(); // 4、创建一个Session对象。 // 第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。 // 第二个参数:应答模式。自动应答或者手动应答。一般自动应答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用queue Queue queue = session.createQueue("test-queue"); // 6、使用Session对象创建一个Producer对象。 MessageProducer producer = session.createProducer(queue); // 7、创建一个Message对象,可以使用TextMessage。 for (int i = 0; i < 50; i++) { TextMessage textMessage = session.createTextMessage("第"+i+ "一个ActiveMQ队列目的地的消息"); // 8、发送消息 producer.send(textMessage); } // 9、关闭资源 producer.close(); session.close(); connection.close(); }
消费者消费消息测试方法
@Test public void testQueueConsumer() throws Exception { // 创建一个ConnectionFactory对象连接MQ服务器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 创建一个连接对象 Connection connection = connectionFactory.createConnection(); // 开启连接 connection.start(); // 使用Connection对象创建一个Session对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建一个Destination对象。queue对象 Queue queue = session.createQueue("test-queue"); // 使用Session对象创建一个消费者对象。 MessageConsumer consumer = session.createConsumer(queue); // 接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // 打印结果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println("这是接收到的消息:" + text); } catch (JMSException e) { e.printStackTrace(); } } }); // 等待接收消息 System.in.read(); // 关闭资源 consumer.close(); session.close(); connection.close(); }
我们开启两个消费者进程来监听(运行两次testQueueConsumer()方法)。
然后我们运行运行生产者测试方法发送消息.先发送消息还是先监听消息一般不会不影响。效果如下:
两个消费者各自消费一半消息,而且还是按照消息发送到消息队列的顺序,这也验证了我们上面的说法。
第一个消费者第二个消费者
生产者发送消息测试方法:
@Test public void testTopicProducer() throws Exception { // 1、创建一个连接工厂对象,需要指定服务的ip及端口。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 2、使用工厂对象创建一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 3、开启连接,调用Connection对象的start方法。 connection.start(); // 4、创建一个Session对象。 // 第一个参数:是否开启事务。如果true开启事务,第二个参数无意义。一般不开启事务false。 // 第二个参数:应答模式。自动应答或者手动应答。一般自动应答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5、使用Session对象创建一个Destination对象。两种形式queue、topic,现在应该使用topic Topic topic = session.createTopic("test-topic"); // 6、使用Session对象创建一个Producer对象。 MessageProducer producer = session.createProducer(topic); // 7、创建一个Message对象,可以使用TextMessage。 for (int i = 0; i < 50; i++) { TextMessage textMessage = session.createTextMessage("第"+i+ "一个ActiveMQ队列目的地的消息"); // 8、发送消息 producer.send(textMessage); } // 9、关闭资源 producer.close(); session.close(); connection.close(); }
消费者消费消息测试方法:
@Test public void testTopicConsumer() throws Exception { // 创建一个ConnectionFactory对象连接MQ服务器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 创建一个连接对象 Connection connection = connectionFactory.createConnection(); // 开启连接 connection.start(); // 使用Connection对象创建一个Session对象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建一个Destination对象。topic对象 Topic topic = session.createTopic("test-topic"); // 使用Session对象创建一个消费者对象。 MessageConsumer consumer = session.createConsumer(topic); // 接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // 打印结果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println("这是接收到的消息:" + text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic消费者启动。。。。"); // 等待接收消息 System.in.read(); // 关闭资源 consumer.close(); session.close(); connection.close(); }
先运行两个消费者进程(提前订阅,不然收不到发送的消息),然后运行生产者测试方法发送消息。
结果是:
两个消费者进程都可以接收到生产者发送过来的所有消息,我这里就不贴图片了,这样验证了我们上面的说法。我们从上面代码就可以看出,点对点通信和发布订阅通信模式的区别就是创建生产者和消费者对象时提供的Destination对象不同,如果是点对点通信创建的Destination对象是Queue,发布订阅通信模式通信则是Topic。
整合spring除了我们上面依赖的Jar包还要依赖
org.springframework spring-jms 4.2.7.RELEASE org.springframework spring-context-support 4.2.7.RELEASE
比如我们在我们的系统中现在有两个服务,第一个服务发送消息,第二个服务接收消息,我们下面看看这是如何实现的。
发送消息的配置文件:
spring-queue
发送消息的测试方法:
@Test public void testSpringActiveMq() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //从spring容器中获得JmsTemplate对象 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); //从spring容器中取Destination对象 Destination destination = (Destination) applicationContext.getBean("queueDestination"); //使用JmsTemplate对象发送消息。 jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { //创建一个消息对象并返回 TextMessage textMessage = session.createTextMessage("spring activemq queue message"); return textMessage; } }); }
我们上面直接ApplicationContext的getBean方法获取的对象,实际在项目使用依赖注入即可。
创建一个MessageListener的实现类。
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //取消息内容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } }}
接收消息的配置文件:
spring-queue
测试接收消息的代码
@Test public void testQueueConsumer() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //等待 System.in.read(); }
欢迎关注我的微信公众号:"Java面试通关手册"(一个有温度的微信公众号,期待与你共同进步~~~坚持原创,分享美文,分享各种Java学习资源):。
转载地址:http://ubmwo.baihongyu.com/