`
Donald_Draper
  • 浏览: 950605 次
社区版块
存档分类
最新评论

JMS(ActiveMQ) PTP和PUB/SUB模式实例

阅读更多
深入浅出JMS(一)——JMS简介 :http://blog.csdn.net/aking21alinjuju/article/details/6051421
深入浅出JMS(二)——JMS的组成 :http://blog.csdn.net/aking21alinjuju/article/details/6071123
JMS消息详解 :http://blog.csdn.net/buoymp/article/details/784696
深入掌握JMS(一):JSM基础 :http://blog.csdn.net/zhangxs_3/article/details/4034713
深入掌握JMS(二):一个JMS例子:http://blog.csdn.net/zhangxs_3/article/details/4034775
深入掌握JMS(三):MessageListener:http://blog.csdn.net/zhangxs_3/article/details/4034788
深入掌握JMS(四):实战Queue:http://blog.csdn.net/zhangxs_3/article/details/4034801
深入掌握JMS(五):实战Topic :http://blog.csdn.net/zhangxs_3/article/details/4034811
深入掌握JMS(六):消息头 :http://blog.csdn.net/zhangxs_3/article/details/4034834
深入掌握JMS(七):DeliveryMode例子:http://blog.csdn.net/zhangxs_3/article/details/4034837
深入掌握JMS(八):JMSReplyTo :http://blog.csdn.net/zhangxs_3/article/details/4034847
JMS应用示例教程:http://qidaoxp.iteye.com/blog/480047
JMS与MQ详解:http://www.fx114.net/qa-48-91234.aspx
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
在做下面的测试之前要先安装ActiveMQ,下载apache-activemq-5.12.1-bin.tar.gz包,解压后
目录如下:
[root@zabbix apache-activemq-5.12.1]# ls
activemq-all-5.12.1.jar  bin  conf  data  docs  examples  lib  LICENSE  NOTICE  README.txt  tmp  webapps  webapps-demo
[root@zabbix apache-activemq-5.12.1]# cd bin


启动ActiveMQ
[root@zabbix bin]# ./activemq start
INFO: Loading '/acivemq/apache-activemq-5.12.1//bin/env'
INFO: Using java '/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/acivemq/apache-activemq-5.12.1//data/activemq.pid' (pid ' 4047')

查看ActiveMQ状态
[root@zabbix bin]# ./activemq status
INFO: Loading '/acivemq/apache-activemq-5.12.1//bin/env'
INFO: Using java '/bin/java'
ActiveMQ is running (pid '4047')
[root@zabbix bin]# 

ActiveMQ相关进程8161控制台用户admin,密码admin(http://192.168.126.128:8161),
ActiveMq 监听端口61616
[root@zabbix bin]# netstat -ntlp | grep 4047
tcp6       0      0 :::5672                 :::*                    LISTEN      4047/java           
tcp6       0      0 :::61613                :::*                    LISTEN      4047/java           
tcp6       0      0 :::61614                :::*                    LISTEN      4047/java           
tcp6       0      0 :::61616                :::*                    LISTEN      4047/java           
tcp6       0      0 :::42614                :::*                    LISTEN      4047/java           
tcp6       0      0 :::1883                 :::*                    LISTEN      4047/java           
tcp6       0      0 :::8161                 :::*                    LISTEN      4047/java   

关闭ActiveMq,原ActiveMq配置,这个命令会报错
[root@zabbix bin]# ./activemq stop

修改conf/activemq.xml文件的如下配置即可:
 
  <!--   
         The managementContext is used to configure how ActiveMQ is exposed in   
         JMX. By default, ActiveMQ uses the MBean server that is started by   
         the JVM. For more information, see:   
      
             http://activemq.apache.org/jmx.html   
     -->  
     <managementContext>  
       <managementContext createConnector="false"/>  
     </managementContext>  

修改 createConnector="true" 然后重新启动activemq,即可


需要引入的jar包如下:




PTP生产者

package mq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Queue(点对点)方式  生存者Producer 
 * @author donald
 *
 */
public class QueueProducer {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url =  "tcp://192.168.126.128:61616";  
   private static String qname =  "testQueue";
 
   public static void main(String[] args)throws Exception {  
        // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       // Connection 启动  
       connection.start();  
       System.out.println("Connection is start...");  
      //创建一个session
//第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
//第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
//Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
//Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
//DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
 
       Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);  
       // Queue :消息的目的地;消息发送给谁.  
       Queue  destination = session.createQueue(qname);  
       // MessageProducer:消息发送者  
       MessageProducer producer = session.createProducer(destination);  
       //设置生产者的模式,有两种可选
//DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
//DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空 
       producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
        // 构造消息,此处写死,项目就是参数,或者方法获取  
       sendMessage(session, producer);  
       session.commit();  
 
       connection.close();  
       System.out.println("send text ok.");  
   }  
     
   public static void sendMessage(Session session, MessageProducer producer)  
           throws Exception {  
       for (int i = 1; i <= 5; i++) {//有限制,达到1000就不行  
           TextMessage message = session.createTextMessage("向ActiveMq发送的Queue消息" + i);  
           // 发送消息到目的地方  
           System.out.println("发送消息:" + "ActiveMq 发送的Queue消息" + i);  
           producer.send(message);  
       }  
   }  
} 

PTP消费者1:
package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * Queue(点对点)方式  消费这Consumer
 * @author donald
 *
 */
public class QueueConsumer {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url = "tcp://192.168.126.128:61616";  
   private static String qname =  "testQueue";
   public static void main(String[] args) throws Exception{  
       // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       connection.start();  
       // Session: 一个发送或接收消息的线程  
       final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
       // Destination :消息的目的地;消息发送给谁.  
       Queue destination=session.createQueue(qname);  
       // 消费者,消息接收者  
       MessageConsumer consumer = session.createConsumer(destination);  
       consumer.setMessageListener(new MessageListener(){//有事务限制  
           @Override  
           public void onMessage(Message message) {  
               try {  
                   TextMessage textMessage=(TextMessage)message;  
                   System.out.println("消费消息:"+textMessage.getText());  
               } catch (JMSException e1) {  
                   e1.printStackTrace();  
               }  
               try {  
                   session.commit();  
               } catch (JMSException e) {  
                   e.printStackTrace();  
               }  
           }  
       });  
         
/*  另外一种接受方式 
*    while (true) { 
             //设置接收者接收消息的时间,为了便于测试,这里谁定为100s 
             TextMessage message = (TextMessage) consumer.receive(100000); 
             if (null != message) { 
                 System.out.println("收到消息" + message.getText()); 
             } else { 
                 break; 
             } 
         }*/  
   }  
}  

PTP消费者2:

public class QueueConsumer2 {  
      .... 除了下面这一句,其他与QueueConsumer相同
      System.out.println("2消费消息:"+textMessage.getText()); 
      ....
}  


控制台输出

PTP生产者:
Connection is start...
发送消息:ActiveMq 发送的Queue消息1
发送消息:ActiveMq 发送的Queue消息2
发送消息:ActiveMq 发送的Queue消息3
发送消息:ActiveMq 发送的Queue消息4
发送消息:ActiveMq 发送的Queue消息5
send text ok.

PTP消费者1:

消费消息:向ActiveMq发送的Queue消息1
消费消息:向ActiveMq发送的Queue消息3
消费消息:向ActiveMq发送的Queue消息5

PTP消费者2:
2消费消息:向ActiveMq发送的Queue消息2
2消费消息:向ActiveMq发送的Queue消息4

从上面可以看出,Queue的消息,只要被消费者,消费一次


PUB/SUB生产者
package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import mq.enity.Order;
import mq.enity.User;

/**
 * Topic(发布/订阅)方式  发布者Publisher  
 * @author donald
 *
 */
public class TopicPublisher {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url =  "tcp://192.168.126.128:61616";  
   private static String tname =  "testTopic";
   public static void main(String[] args)throws Exception {  
        // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       // Connection 启动  
       connection.start();  
       System.out.println("Connection is start...");  
       // Session: 一个发送或接收消息的线程  
       Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);  
       // Topicr :消息的目的地;消息发送给谁.  
       Topic  destination = session.createTopic(tname);  
       // MessageProducer:消息发送者  
       MessageProducer producer = session.createProducer(destination);  
       // 设置持久化,此处学习,实际根据项目决定  
       producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
        // 构造消息,此处写死,项目就是参数,或者方法获取  
       sendMessage(session, producer);  
       session.commit();  
 
       connection.close();  
       System.out.println("send Order ok.");  
   }  
   /**
    * 
    * @param session
    * @param producer
    * @throws Exception
    */
   public static void sendMessage(Session session, MessageProducer producer)  
           throws Exception {  
       Order order = new Order();
       order.setId(1);
       order.setAmount(150.62);
       order.setGoodsId(15);
       order.setGoodsAmount(2);
       order.setShopId(5656);
       //我们也可以将Object转换为Json String,作为TextMessage来传送,在消费再反Json String 为Obejct
       ObjectMessage orderMess = session.createObjectMessage(order);
       System.out.println("向ActiveMq:"+tname+"发送订单信息:" + "ActiveMq 发送的Topic消息"); 
       producer.send(orderMess); 
   }  
 
}  

PUB/SUB 订阅者1
package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import mq.enity.Order;
import mq.enity.User;

/**
 * Topic(发布/订阅)方式  订阅者TopicSubscriber  
 * @author donald
 *
 */
public class TopicSubscriber {  
   private static String user = ActiveMQConnection.DEFAULT_USER;  
   private static String password =ActiveMQConnection.DEFAULT_PASSWORD;  
   private static String url = "tcp://192.168.126.128:61616";  
   private static String tname =  "testTopic";
   public static void main(String[] args) throws Exception{  
       // ConnectionFactory :连接工厂,JMS 用它创建连接  
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user,password,url);  
       // Connection :JMS 客户端到JMS Provider 的连接  
       Connection connection = connectionFactory.createConnection();  
       connection.start();  
       // Session: 一个发送或接收消息的线程  
       final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
       // Destination :消息的目的地;消息发送给谁.  
       Topic destination=session.createTopic(tname);  
       // 消费者,消息接收者  
       MessageConsumer consumer = session.createConsumer(destination);  
       consumer.setMessageListener(new MessageListener(){//有事务限制  
           @Override  
           public void onMessage(Message message) {  
               try {  
            	   ObjectMessage objMessage=(ObjectMessage)message;  
                   Order order = (Order)objMessage.getObject();
                   System.out.println("消费订单信息:"+order.toString()); 
                   
               } catch (JMSException e1) {  
                   e1.printStackTrace();  
               }  
               try {  
                   session.commit();  
               } catch (JMSException e) {  
                   e.printStackTrace();  
               }  
           }  
       });  
         
/*  另外一种接受方式 
*    while (true) { 
             //设置接收者接收消息的时间,为了便于测试,这里谁定为100s 
             TextMessage message = (TextMessage) consumer.receive(100000); 
             if (null != message) { 
                 System.out.println("收到消息" + message.getText()); 
             } else { 
                 break; 
             } 
         }*/  
   }  
}  


PUB/SUB 订阅者2

package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import mq.enity.Order;
import mq.enity.User;

/**
 * Topic(发布/订阅)方式  订阅者TopicSubscriber  
 * @author donald
 *
 */
public class TopicSubscriber2 {  
     ...其他与TopicSubscriber相同
     System.out.println("2消费订单信息:"+order.toString());  
     ...
              
}  


订单信息类:
package mq.enity;

import java.io.Serializable;

public class Order implements Serializable{
	
	/**
	 * 
	 */
	private static final long serialVersionUID = -343247274477730446L;
	private Integer id;//订单id
	private Double amount;
	private Integer goodsId;//商品id
	private Integer goodsAmount;//商品数量
	private Integer shopId;//店铺名
	public Integer getId() {
		return id;
	}
	public void setId(Integer id) {
		this.id = id;
	}
	public Double getAmount() {
		return amount;
	}
	public void setAmount(Double amount) {
		this.amount = amount;
	}
	public Integer getGoodsId() {
		return goodsId;
	}
	public void setGoodsId(Integer goodsId) {
		this.goodsId = goodsId;
	}
	public Integer getGoodsAmount() {
		return goodsAmount;
	}
	public void setGoodsAmount(Integer goodsAmount) {
		this.goodsAmount = goodsAmount;
	}
	public Integer getShopId() {
		return shopId;
	}
	public void setShopId(Integer shopId) {
		this.shopId = shopId;
	}
	public String toString(){
		return "订单id:"+this.id+","+"金额(元):"+this.amount+","+"商品id:"+
	          this.shopId+","+"商品数量:"+this.goodsAmount+","+"店铺id:"+this.shopId;
	}
	
}


控制台输出:

PUB/SUB生产者:

Connection is start...
向ActiveMq:testTopic发送订单信息:ActiveMq 发送的Topic消息
send Order ok.


PUB/SUB 订阅者1:

消费订单信息:订单id:1,金额(元):150.62,商品id:5656,商品数量:2,店铺id:5656

PUB/SUB 订阅者2:
2消费订单信息:订单id:1,金额(元):150.62,商品id:5656,商品数量:2,店铺id:5656
  • 大小: 4.2 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics