`
Donald_Draper
  • 浏览: 950626 次
社区版块
存档分类
最新评论

ActiveMQ生产者详解

阅读更多
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
上一篇我们讲到会话的初始化
Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); 


会话的创建主要最的工作是,初始化消费者,生产者id产生器,会话消费者,生产者队列,
消息确认模式,是否异步分发,设置连接事务上下文,异步发送会话信息,新建消息会话执行器,会话添加到ActiveMQConnection的会话队列CopyOnWriteArrayList;然后启动消费则,主要是启动消息分发通道,唤醒会话执行器ActiveMQSessionExecutor;最后启动会话执行器ActiveMQSessionExecutor,在启动会话执行器时,如果消息分发通道处于未启动状态,则启动消息分发通道,如果有未消费的消息,唤醒消息执行器,唤醒主要做的做工作是
ActiveMQConnection创建任务执行TaskRunnerFactory,有任务执行工厂TaskRunnerFactory,有任务执行工厂创建执行任务PooledTaskRunner,PooledTaskRunner是ActiveMQSessionExecutor的包装,PooledTaskRunner执行就是执行ActiveMQSessionExecutor
iterate的函数,这个过程主要是ActiveMQSessionExecutor从ActiveMQSession获取会话消费者consumer,然后遍历消费者,消费者通过MessageListener消费消息。ActiveMQConnection与ActiveMQSession,ActiveMQMessageConsumer,ActiveMQMessageProducer的关系,连接管理会话(1-n),会话管理消息者与生产者(1-n)。ActiveMQSession关联一个
ActiveMQSessionExecutor,由会话执行器,消费者消费消息。

今天我们往下看,看看消息队列,生产者,以及发送消息

Queue :消息的目的地;消息发送给谁.  
Queue  destination = session.createQueue(qname);  
MessageProducer:消息发送者  
MessageProducer producer = session.createProducer(destination);  
//设置生产者的模式,有两种可选
//DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
//DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空 
producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
构造消息,此处写死,项目就是参数,或者方法获取  
sendMessage(session, producer);  
session.commit();  
connection.close();  



1.先看消息队列,从下一句开始
Queue  destination = session.createQueue(qname);
 public Queue createQueue(String queueName)
        throws JMSException
    {
        checkClosed();
	//如果队列名以id开始则创建临时队列,否则创建ActiveMQQueue
        if(queueName.startsWith("ID:"))
            return new ActiveMQTempQueue(queueName);
        else
            return new ActiveMQQueue(queueName);
    }

来看ActiveMQQueue
//ActiveMQQueue
public class ActiveMQQueue extends ActiveMQDestination
    implements Queue
{
    public static final byte DATA_STRUCTURE_TYPE = 100;
    private static final long serialVersionUID = -3885260014960795889L;
 public ActiveMQQueue(String name)
    {
        //构造父类
        super(name);
    }
    public String getQueueName()
        throws JMSException
    {
        return getPhysicalName();
    }
    public byte getDestinationType()
    {
        return 1;
    }
    protected String getQualifiedPrefix()
    {
        return "queue://";
    } 
}

public abstract class ActiveMQDestination extends JNDIBaseStorable
    implements DataStructure, Destination, Externalizable, Comparable
{
    public static final String PATH_SEPERATOR = ".";
    public static final char COMPOSITE_SEPERATOR = 44;
    public static final byte QUEUE_TYPE = 1;//队列类型
    public static final byte TOPIC_TYPE = 2;//主题类型
    public static final byte TEMP_MASK = 4;
    public static final byte TEMP_TOPIC_TYPE = 6;
    public static final byte TEMP_QUEUE_TYPE = 5;
    public static final String QUEUE_QUALIFIED_PREFIX = "queue://";//队列命名目录
    public static final String TOPIC_QUALIFIED_PREFIX = "topic://";//主题命名目录
    public static final String TEMP_QUEUE_QUALIFED_PREFIX = "temp-queue://";
    public static final String TEMP_TOPIC_QUALIFED_PREFIX = "temp-topic://";
    public static final String IS_DLQ = "isDLQ";
    public static final String TEMP_DESTINATION_NAME_PREFIX = "ID:";
    private static final long serialVersionUID = -3885260014960795889L;
    protected String physicalName;
    protected transient ActiveMQDestination compositeDestinations[];
    protected transient String destinationPaths[];
    protected transient boolean isPattern;
    protected transient int hashValue;
    protected Map options;
    protected static UnresolvedDestinationTransformer unresolvableDestinationTransformer = new DefaultUnresolvedDestinationTransformer();
 protected ActiveMQDestination(String name)
    {
        setPhysicalName(name);
    }
    //获取目的地类型
     public String getDestinationTypeAsString()
    {
        switch(getDestinationType())
        {
        case 1: // '\001'
            return "Queue";

        case 2: // '\002'
            return "Topic";

        case 5: // '\005'
            return "TempQueue";

        case 6: // '\006'
            return "TempTopic";

        case 3: // '\003'
        case 4: // '\004'
        default:
            throw new IllegalArgumentException((new StringBuilder()).append("Invalid destination type: ").append(getDestinationType()).toString());
        }
    }
    //创建目的地
    public static ActiveMQDestination createDestination(String name, byte defaultType)
    {
        if(name.startsWith("queue://"))
            return new ActiveMQQueue(name.substring("queue://".length()));
        if(name.startsWith("topic://"))
            return new ActiveMQTopic(name.substring("topic://".length()));
        if(name.startsWith("temp-queue://"))
            return new ActiveMQTempQueue(name.substring("temp-queue://".length()));
        if(name.startsWith("temp-topic://"))
            return new ActiveMQTempTopic(name.substring("temp-topic://".length()));
        switch(defaultType)
        {
        case 1: // '\001'
            return new ActiveMQQueue(name);

        case 2: // '\002'
            return new ActiveMQTopic(name);

        case 5: // '\005'
            return new ActiveMQTempQueue(name);

        case 6: // '\006'
            return new ActiveMQTempTopic(name);

        case 3: // '\003'
        case 4: // '\004'
        default:
            throw new IllegalArgumentException((new StringBuilder()).append("Invalid default destination type: ").append(defaultType).toString());
        }
    }
    //将目的地物理名和配置项写入输出流
     public void writeExternal(ObjectOutput out)
        throws IOException
    {
        out.writeUTF(getPhysicalName());
        out.writeObject(options);
    }
    //从输入流读取目的地物理名和配置项
    public void readExternal(ObjectInput in)
        throws IOException, ClassNotFoundException
    {
        setPhysicalName(in.readUTF());
        options = (Map)in.readObject();
    }

}


Topic :消息的目的地;消息发送给谁. 
Topic  destination = session.createTopic(tname);  

public Topic createTopic(String topicName)
        throws JMSException
    {
        checkClosed();
        if(topicName.startsWith("ID:"))
            return new ActiveMQTempTopic(topicName);
        else
            return new ActiveMQTopic(topicName);
    }

public class ActiveMQTopic extends ActiveMQDestination
    implements Topic
{
public static final byte DATA_STRUCTURE_TYPE = 101;
    private static final long serialVersionUID = 7300307405896488588L;
    public ActiveMQTopic(String name)
    {
        super(name);
    }
    public byte getDataStructureType()
    {
        return 101;
    }
    public boolean isTopic()
    {
        return true;
    }
    public String getTopicName()
        throws JMSException
    {
        return getPhysicalName();
    }
    public byte getDestinationType()
    {
        return 2;
    }
     protected String getQualifiedPrefix()
    {
        return "topic://";
    }
}

从分析ActiveMQTopic和ActiveMQQueue可以看出本质都是ActiveMQDestination
只是JNDI和数据结构类型不同


2.再来会话创建生产者
MessageProducer producer = session.createProducer(destination); 

先看destination为ActiveMQQueue,再看ActiveMQTopic

destination为ActiveMQQueue
//根据队列目的地创建消息生产者
public MessageProducer createProducer(Destination destination)
        throws JMSException
    {
        checkClosed();
        if(destination instanceof CustomDestination)
        {
            CustomDestination customDestination = (CustomDestination)destination;
            return customDestination.createProducer(this);
        } else
        {
            int timeSendOut = connection.getSendTimeout();
            return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination), timeSendOut);
        }
    }

//ActiveMQMessageProducer
public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport
    implements StatsCapable, Disposable
{
   protected ProducerInfo info;//生产者信息
    protected boolean closed;
    private final JMSProducerStatsImpl stats;//生产者状态管理器
    private AtomicLong messageSequence;//消息序列号
    private final long startTime = System.currentTimeMillis();
    private MessageTransformer transformer;//消息转化器
    private MemoryUsage producerWindow;//生产者窗口
     protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination, int sendTimeout)
        throws JMSException
    {
        super(session);
	//创建生产者信息
        info = new ProducerInfo(producerId);
	//从连接获取生产者窗口信息
        info.setWindowSize(session.connection.getProducerWindowSize());
        if(destination != null && destination.getOptions() != null)
        {
            Map options = IntrospectionSupport.extractProperties(new HashMap(destination.getOptions()), "producer.");
            IntrospectionSupport.setProperties(info, options);
            if(options.size() > 0)
            {
                String msg = (new StringBuilder()).append("There are ").append(options.size()).append(" producer options that couldn't be set on the producer.").append(" Check the options are spelled correctly.").append(" Unknown parameters=[").append(options).append("].").append(" This producer cannot be started.").toString();
                LOG.warn(msg);
                throw new ConfigurationException(msg);
            }
        }
        info.setDestination(destination);
        if(session.connection.getProtocolVersion() >= 3 && info.getWindowSize() > 0)
        {
            producerWindow = new MemoryUsage((new StringBuilder()).append("Producer Window: ").append(producerId).toString());
            producerWindow.setExecutor(session.getConnectionExecutor());
            producerWindow.setLimit(info.getWindowSize());
            producerWindow.start();
        }
	//默认为消息持久化
        defaultDeliveryMode = 2;//
        defaultPriority = 4;//默认优先级为r4
        defaultTimeToLive = 0L;
        messageSequence = new AtomicLong(0L);//新建消息序列号
        stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
        try
        {
	   //将消息生产者添加到会话,通过会话将生产者信息发送给Server
            this.session.addProducer(this);
            this.session.syncSendPacket(info);
        }
        setSendTimeout(sendTimeout);
	//设置消息转换器
        setTransformer(session.getTransformer());
    }
}


//ActiveMQMessageProducerSupport
public abstract class ActiveMQMessageProducerSupport
    implements MessageProducer, Closeable
{
    protected ActiveMQSession session;//会话
    protected boolean disableMessageID;
    protected boolean disableMessageTimestamp;
    protected int defaultDeliveryMode;//默认传输模式
    protected int defaultPriority;//默认优先级
    protected long defaultTimeToLive;
    protected int sendTimeout;
    public ActiveMQMessageProducerSupport(ActiveMQSession session)
    {
        sendTimeout = 0;
        this.session = session;
        disableMessageTimestamp = session.connection.isDisableTimeStampsByDefault();
    }
}


从上面来看,会话创建消息生产者就是,初始化ActiveMQMessageProducer的生产者信息,生产者状态管理器
,消息序列号,传输模式(默认持久化),消息转化器,生产者窗口,并将生产者信息通过会话发送给Server
再来看destination为ActiveMQTopic的情况
 public TopicPublisher createPublisher(Topic topic)
        throws JMSException
    {
        checkClosed();
        if(topic instanceof CustomDestination)
        {
            CustomDestination customDestination = (CustomDestination)topic;
            return customDestination.createPublisher(this);
        } else
        {
            int timeSendOut = connection.getSendTimeout();
            return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic), timeSendOut);
        }
    }

//ActiveMQTopicPublisher
public class ActiveMQTopicPublisher extends ActiveMQMessageProducer
    implements TopicPublisher
{
    protected ActiveMQTopicPublisher(ActiveMQSession session, ActiveMQDestination destination, int sendTimeout)
        throws JMSException
    {
        super(session, session.getNextProducerId(), destination, sendTimeout);
    }
    public Topic getTopic()
        throws JMSException
    {
        return (Topic)super.getDestination();
    }
    public void publish(Message message)
        throws JMSException
    {
        super.send(message);
    }
    public void publish(Message message, int deliveryMode, int priority, long timeToLive)
        throws JMSException
    {
        super.send(message, deliveryMode, priority, timeToLive);
    }
    public void publish(Topic topic, Message message)
        throws JMSException
    {
        super.send(topic, message);
    }
    public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive)
        throws JMSException
    {
        super.send(topic, message, deliveryMode, priority, timeToLive);
    }
}

从上面可以看出,ActiveMQTopicPublisher本质上是ActiveMQMessageProducer,TopicPublisher的消息的发送都是委托给ActiveMQMessageProducer

3.发送消息
sendMessage(session, producer);
 public static void sendMessage(Session session, MessageProducer producer)  
           throws Exception {  
       for (int i = 1; i <= 5; i++) {//有限制,达到1000就不行  
           TextMessage message = session.createTextMessage("向ActiveMq发送的Queue消息" + i);  
           // 发送消息到目的地方  
           System.out.println("发送消息:" + "ActiveMq 发送的Queue消息" + i);  
           producer.send(message);  
       }  
   }  



先看会话创建消息,再看生产者发送消息

会话创建消息
//ActiveMQMessageSession
 public TextMessage createTextMessage(String text)
        throws JMSException
    {
        //创建文本消息
        ActiveMQTextMessage message = new ActiveMQTextMessage();
        message.setText(text);
	//配置消息
        configureMessage(message);
        return message;
    }


创建文本消息
ActiveMQTextMessage message = new ActiveMQTextMessage();

public class ActiveMQTextMessage extends ActiveMQMessage
    implements TextMessage
{
    public static final byte DATA_STRUCTURE_TYPE = 28;
    protected String text;
     private void copy(ActiveMQTextMessage copy)
    {
        super.copy(copy);
        copy.text = text;
    }

    public byte getDataStructureType()
    {
        return 28;
    }
    public String getJMSXMimeType()
    {
        return "jms/text-message";
    }
    public void setText(String text)
        throws MessageNotWriteableException
    {
        checkReadOnlyBody();
        this.text = text;
        setContent(null);
    }
}

配置消息
configureMessage(message);

protected void configureMessage(ActiveMQMessage message)
        throws IllegalStateException
    {
        checkClosed();
	//设置消息MQ连接
        message.setConnection(connection);
    }

public abstract class Message extends BaseCommand
    implements MarshallAware, MessageReference
{
    public static final String ORIGINAL_EXPIRATION = "originalExpiration";
    public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
    protected MessageId messageId;//消息id
    protected ActiveMQDestination originalDestination;//消息原目的地
    protected TransactionId originalTransactionId;//事务原id
    protected ProducerId producerId;//生产者id
    protected ActiveMQDestination destination;//消息目的地
    protected TransactionId transactionId;//事务id
    protected long expiration;//失效时间
    protected long timestamp;//时间戳
    protected long arrival;
    protected long brokerInTime;
    protected long brokerOutTime;
    protected String correlationId;
    protected ActiveMQDestination replyTo;//消息回复目的地
    protected boolean persistent;//是否持久化
    protected String type;
    protected byte priority;//优先级
    protected String groupID;
    protected int groupSequence;
    protected ConsumerId targetConsumerId;//消息者id
    protected boolean compressed;
    protected String userID;
    protected ByteSequence content;
    protected ByteSequence marshalledProperties;
    protected DataStructure dataStructure;//数据结构
    protected int redeliveryCounter;//传输计数器
    protected int size;
    protected Map properties;
    protected boolean readOnlyProperties;//读写属性
    protected boolean readOnlyBody;
    protected transient boolean recievedByDFBridge;
    protected boolean droppable;
    protected boolean jmsXGroupFirstForConsumer;
    private transient short referenceCount;
    private transient ActiveMQConnection connection;//连接
    transient MessageDestination regionDestination;
    transient MemoryUsage memoryUsage;//内存使用情况
    private BrokerId brokerPath[];//broker地址
    private BrokerId cluster[];//簇地址
}

再来看一下ObjectMessage
public ObjectMessage createObjectMessage(Serializable object)
        throws JMSException
    {
       //创建ObjectMessage
        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
	//配置消息连接ActiveMQConnection信息
        configureMessage(message);
        message.setObject(object);
        return message;
    }

//ActiveMQObjectMessage
public class ActiveMQObjectMessage extends ActiveMQMessage
    implements ObjectMessage
{
    public static final byte DATA_STRUCTURE_TYPE = 26;
    static final ClassLoader ACTIVEMQ_CLASSLOADER = org/apache/activemq/command/ActiveMQObjectMessage.getClassLoader();
    protected transient Serializable object;
    public byte getDataStructureType()
    {
        return 26;
    }
    public String getJMSXMimeType()
    {
        return "jms/object-message";
    }
}

生产者发送消息



//ActiveMQMessageProducerSupport
 public void send(Message message)
        throws JMSException
    {
        send(getDestination(), message, defaultDeliveryMode, defaultPriority, defaultTimeToLive);
    }



//ActiveMQMessageProducer发送消息
 public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
        throws JMSException
    {
        send(destination, message, deliveryMode, priority, timeToLive, null);
    }

     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete)
        throws JMSException
    {
        checkClosed();
        if(destination == null)
            if(info.getDestination() == null)
                throw new UnsupportedOperationException("A destination must be specified.");
            else
                throw new InvalidDestinationException("Don't understand null destinations");
        ActiveMQDestination dest;
        //转换消息目的地
        if(destination.equals(info.getDestination()))
            dest = (ActiveMQDestination)destination;
        else
        if(info.getDestination() == null)
            dest = ActiveMQDestination.transform(destination);
        else
            throw new UnsupportedOperationException((new StringBuilder()).append("This producer can only send messages to: ").append(info.getDestination().getPhysicalName()).toString());
        if(dest == null)
            throw new JMSException("No destination specified");
        //转换消息
        if(transformer != null)
        {
	    //实际使用ActiveMQConnection的消息转换器
            Message transformedMessage = transformer.producerTransform(session, this, message);
            if(transformedMessage != null)
                message = transformedMessage;
        }
	//如果生产者窗口不为空,则等待有足够的空间
        if(producerWindow != null)
            try
            {
                producerWindow.waitForSpace();
            }
            catch(InterruptedException e)
            {
                throw new JMSException("Send aborted due to thread interrupt.");
            }
	//通过会话发送消息
        session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
        //记录消息发送事件
	stats.onMessage();
    }


通过会话发送消息
session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);


//ActiveMQSession
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, 
            MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete)
        throws JMSException
    {
        checkClosed();
        if(destination.isTemporary() && connection.isDeleted(destination))
            throw new InvalidDestinationException((new StringBuilder()).append("Cannot publish to a deleted Destination: ").append(destination).toString());
        //获取发送互质锁
	synchronized(sendMutex)
        {
	    //开始事务,获取事务id
            doStartTransaction();
            TransactionId txid = transactionContext.getTransactionId();
	    //获取生产者序列号
            long sequenceNumber = producer.getMessageSequence();
            message.setJMSDeliveryMode(deliveryMode);//设置持久化模式
            long expiration = 0L;
            if(!producer.getDisableMessageTimestamp())
            {
	        //设置消息时间戳及生存时间
                long timeStamp = System.currentTimeMillis();
                message.setJMSTimestamp(timeStamp);
                if(timeToLive > 0L)
                    expiration = timeToLive + timeStamp;
            }
	    //设置消息失效时间,优先级
            message.setJMSExpiration(expiration);
            message.setJMSPriority(priority);
            message.setJMSRedelivered(false);//不允许消息重传
	    //转化消息
            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
	    //设置消息目的地
            msg.setDestination(destination);
	    //设置消息id
            msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
            if(msg != message)
            {
                message.setJMSMessageID(msg.getMessageId().toString());
                message.setJMSDestination(destination);
            }
            msg.setBrokerPath(null);
            msg.setTransactionId(txid);//设置事务id
            if(connection.isCopyMessageOnSend())
                msg = (ActiveMQMessage)msg.copy();
            //设置消息连接
            msg.setConnection(connection);
            msg.onSend();
	    //设置消息生产者id
            msg.setProducerId(msg.getMessageId().getProducerId());
            if(LOG.isTraceEnabled())
                LOG.trace((new StringBuilder()).append(getSessionId()).append(" sending message: ").append(msg).toString());
            if(onComplete == null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null))
            {
	        //如果消息发送回调为空,则异步发送消息
                connection.asyncSendPacket(msg);
                if(producerWindow != null)
                {
                    int size = msg.getSize();
		    //增加内存消耗
                    producerWindow.increaseUsage(size);
                }
            } else
            if(sendTimeout > 0 && onComplete == null)
	        //否则同步延时发送消息
                connection.syncSendPacket(msg, sendTimeout);
            else
                connection.syncSendPacket(msg, onComplete);
        }
    }

先看一下会话发送消息新建事务所做的事情
开始事务,获取事务id
doStartTransaction();


protected void doStartTransaction()
        throws JMSException
    {
        if(getTransacted() && !transactionContext.isInXATransaction())
            transactionContext.begin();
    }

public class TransactionContext
    implements XAResource
{ private static final HashMap ENDED_XA_TRANSACTION_CONTEXTS = new HashMap();
    private ActiveMQConnection connection;//事务所属连接
    private final LongSequenceGenerator localTransactionIdGenerator;
    private List synchronizations;
    private Xid associatedXid;
    private TransactionId transactionId;//事务id
    private LocalTransactionEventListener localTransactionEventListener;
    private int beforeEndIndex;
 public void begin()
        throws JMSException
    {
        if(isInXATransaction())
            throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
        if(transactionId == null)
        {
            synchronizations = null;
            beforeEndIndex = 0;
	    //新建本地事务id,和事务信息
            transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId());
            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, (byte)0);
            connection.ensureConnectionInfoSent();
	    //通过连接异步发送事务信息报
            connection.asyncSendPacket(info);
            if(localTransactionEventListener != null)
                localTransactionEventListener.beginEvent();
            LOG.debug("Begin:{}", transactionId);
        }
    }
}

回到会话异步发送消息

如果消息发送回调为空且无发送延时,则异步发送消息
connection.asyncSendPacket(msg);

//ActiveMQConnection,异步发送消息
 
public void asyncSendPacket(Command command)
        throws JMSException
    {
        if(isClosed())
        {
            throw new ConnectionClosedException();
        } else
        {
	    //异步发送消息
            doAsyncSendPacket(command);
            return;
        }
    }
//异步发送消息
private void doAsyncSendPacket(Command command)
        throws JMSException
    {
        try
        {
	    //这个我们前面看过是transport-》TcpTransport-》MutexTransport-》ResponseCorrelator
            transport.oneway(command);
        }
        catch(IOException e)
        {
            throw JMSExceptionSupport.create(e);
        }
    }

//ResponseCorrelator
public class ResponseCorrelator extends TransportFilter
{
    private final Map requestMap;
    private IntSequenceGenerator sequenceGenerator;
    private final boolean debug;
    private IOException error;
    public ResponseCorrelator(Transport next)
    {
        //next为MutexTransport
        this(next, new IntSequenceGenerator());
    }
 public void oneway(Object o)
        throws IOException
    {
        Command command = (Command)o;
	//设置命令序列化id
        command.setCommandId(sequenceGenerator.getNextSequenceId());
        command.setResponseRequired(false);
        next.oneway(command);
    }

//MutexTransport
public class MutexTransport extends TransportFilter
{
    private final ReentrantLock writeLock;
    private boolean syncOnCommand;
    public MutexTransport(Transport next)
    {
         //next为TcpTransport
        super(next);
        writeLock = new ReentrantLock();
        syncOnCommand = false;
    }
    //正在发送命令
     public void oneway(Object command)
        throws IOException
    {
        //获取写锁
        writeLock.lock();
        next.oneway(command);
        writeLock.unlock();
        break MISSING_BLOCK_LABEL_37;
        Exception exception;
        exception;
        writeLock.unlock();
        throw exception;
    }    
}

//TcpTransport

public class TcpTransport extends TransportThreadSupport
    implements Transport, Service, Runnable
{
   protected final URI remoteLocation;//远程URI
    protected final URI localLocation;//本地URI
    protected final WireFormat wireFormat;
    protected int connectionTimeout;//连接超时时间
    protected int soTimeout;
    protected int socketBufferSize;//socket的缓存大小
    protected int ioBufferSize;//io缓存大小
    protected boolean closeAsync;
    protected Socket socket;
    protected DataOutputStream dataOut;//socket的输出流
    protected DataInputStream dataIn;//socket的输入流
    protected TimeStampStream buffOut;
    protected int trafficClass;
    private boolean trafficClassSet;
    protected boolean diffServChosen;
    protected boolean typeOfServiceChosen;
    protected boolean trace;
    protected String logWriterName;
    protected boolean dynamicManagement;
    protected boolean startLogging;
    protected int jmxPort;
    protected boolean useLocalHost;
    protected int minmumWireFormatVersion;
    protected SocketFactory socketFactory;
    protected final AtomicReference stoppedLatch;
    protected volatile int receiveCounter;
    private Map socketOptions;
    private int soLinger;
    private Boolean keepAlive;//是否保活
    private Boolean tcpNoDelay;//tcp是否为非延时
    private Thread runnerThread;
  public void oneway(Object command)
        throws IOException
    {

        checkStarted();
	//将命令写到dataOut缓存区中,而dataOut我们在前面分析中TcpTransport的启动中,
	//TcpTransport与Broker建立套接字,dataOUt为socket的输出流
        wireFormat.marshal(command, dataOut);
	//刷新缓存
        dataOut.flush();
    }
}

我们来看一下wireFormat是什么?
在从连接工厂获取连接中,TcpTransport的构建是从TransportFactory

public abstract class TransportFactory
{
   //TRANSPORT_FACTORY_FINDER,transport加载路径
   private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
    //WIREFORMAT_FACTORY加载路径
    private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
    private static final ConcurrentMap TRANSPORT_FACTORYS = new ConcurrentHashMap();
    private static final String WRITE_TIMEOUT_FILTER = "soWriteTimeout";
    private static final String THREAD_NAME_FILTER = "threadName";
   //TransportFactory创建TcpTransport
  public Transport doConnect(URI location, Executor ex)
        throws Exception
    {
        return doConnect(location);
    }
    public Transport doConnect(URI location)
        throws Exception
    {
        Transport rc;
        Map options = new HashMap(URISupport.parseParameters(location));
        if(!options.containsKey("wireFormat.host"))
            options.put("wireFormat.host", location.getHost());
	//创建WireFormat
        WireFormat wf = createWireFormat(options);
        Transport transport = createTransport(location, wf);
        rc = configure(transport, wf, options);
        if(!options.isEmpty())
            throw new IllegalArgumentException((new StringBuilder()).append("Invalid connect parameters: ").append(options).toString());
        return rc;
        URISyntaxException e;
        e;
        throw IOExceptionSupport.create(e);
    }
    //配置Transport
     public Transport configure(Transport transport, WireFormat wf, Map options)
        throws Exception
    {
        transport = compositeConfigure(transport, wf, options);
        transport = new MutexTransport(transport);
        transport = new ResponseCorrelator(transport);
        return transport;
    }
}

创建WireFormat
WireFormat wf = createWireFormat(options);


  protected WireFormat createWireFormat(Map options)
        throws IOException
    {
        //创建WireFormatFactory
        WireFormatFactory factory = createWireFormatFactory(options);
	//从工厂创建WireFormat
        WireFormat format = factory.createWireFormat();
        return format;
    }

    protected WireFormatFactory createWireFormatFactory(Map options)
        throws IOException
    {
        String wireFormat;
        wireFormat = (String)options.remove("wireFormat");
        if(wireFormat == null)
	   //获取默认wireFormat类型
            wireFormat = getDefaultWireFormatType();
        WireFormatFactory wff;
	//这个是不是有点与获取TransportFactory工厂有点像
	//WIREFORMAT_FACTORY_FINDER,从META-INF/services/org/apache/activemq/wireformat/
	//路径下加载相应的default配置文件,然后加载配置文件class属性对应的class
	//实际org.apache.activemq.openwire.OpenWireFormatFactory
        wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(wireFormat);
        IntrospectionSupport.setProperties(wff, options, "wireFormat.");
        return wff;
        Throwable e;
        e;
        throw IOExceptionSupport.create((new StringBuilder()).append("Could not create wire format factory for: ").append(wireFormat).append(", reason: ").append(e).toString(), e);
    }
    
    protected String getDefaultWireFormatType()
    {
        return "default";
    }


//default配置文件内容
class=org.apache.activemq.openwire.OpenWireFormatFactory


public class OpenWireFormatFactory
    implements WireFormatFactory
{
    private int version;//版本
    private boolean stackTraceEnabled;
    private boolean tcpNoDelayEnabled;//tcp是否有延时
    private boolean cacheEnabled;//是否开启缓存
    private boolean tightEncodingEnabled;
    private boolean sizePrefixDisabled;
    private long maxInactivityDuration;
    private long maxInactivityDurationInitalDelay;
    private int cacheSize;缓存大小//
    private long maxFrameSize;
    private String host;//broker ip
    
    public OpenWireFormatFactory()
    {
        version = 11;
        stackTraceEnabled = true;
        tcpNoDelayEnabled = true;
        cacheEnabled = true;
        tightEncodingEnabled = true;
        maxInactivityDuration = 30000L;
        maxInactivityDurationInitalDelay = 10000L;
        cacheSize = 1024;
        maxFrameSize = 9223372036854775807L;
        host = null;
    }

    public WireFormat createWireFormat()
    {
        //创建WireFormat协议信息
        WireFormatInfo info = new WireFormatInfo();
        info.setVersion(version);
        try
        {
	    //设置wireFormat协议下,socket发送消息的缓存,ip,延时等信息
            info.setStackTraceEnabled(stackTraceEnabled);
            info.setCacheEnabled(cacheEnabled);
            info.setTcpNoDelayEnabled(tcpNoDelayEnabled);
            info.setTightEncodingEnabled(tightEncodingEnabled);
            info.setSizePrefixDisabled(sizePrefixDisabled);
            info.setMaxInactivityDuration(maxInactivityDuration);
            info.setMaxInactivityDurationInitalDelay(maxInactivityDurationInitalDelay);
            info.setCacheSize(cacheSize);
            info.setMaxFrameSize(maxFrameSize);
            if(host != null)
                info.setHost(host);
        }
        catch(Exception e)
        {
            IllegalStateException ise = new IllegalStateException("Could not configure WireFormatInfo");
            ise.initCause(e);
            throw ise;
        }
	//创建OpenWireFormat
        OpenWireFormat f = new OpenWireFormat(version);
        f.setMaxFrameSize(maxFrameSize);
        f.setPreferedWireFormatInfo(info);
        return f;
    }
}

//OpenWireFormat
public final class OpenWireFormat
    implements WireFormat
{
 public static final int DEFAULT_STORE_VERSION = 11;
    public static final int DEFAULT_WIRE_VERSION = 11;
    public static final int DEFAULT_LEGACY_VERSION = 6;
    public static final long DEFAULT_MAX_FRAME_SIZE = 9223372036854775807L;
    static final byte NULL_TYPE = 0;
    private static final int MARSHAL_CACHE_SIZE = 16383;
    private static final int MARSHAL_CACHE_FREE_SPACE = 100;
    private DataStreamMarshaller dataMarshallers[];
    private int version;
    private boolean stackTraceEnabled;
    private boolean tcpNoDelayEnabled;//是否有延时
    private boolean cacheEnabled;//是否启动缓存
    private boolean tightEncodingEnabled;
    private boolean sizePrefixDisabled;
    private long maxFrameSize;
    private short nextMarshallCacheIndex;
    private short nextMarshallCacheEvictionIndex;
    private Map marshallCacheMap;
    private DataStructure marshallCache[];
    private DataStructure unmarshallCache[];
    private DataByteArrayOutputStream bytesOut;//输出流
    private DataByteArrayInputStream bytesIn;
    private WireFormatInfo preferedWireFormatInfo;//协议信息
    public OpenWireFormat(int i)
    {
        maxFrameSize = 9223372036854775807L;
        marshallCacheMap = new HashMap();
        marshallCache = null;
        unmarshallCache = null;
        bytesOut = new DataByteArrayOutputStream();
        bytesIn = new DataByteArrayInputStream();
        setVersion(i);
    }
}

从上面分析可以看出TcpTransport的协议为OpenWireFormat

再回到TcpTransport发送命令
//将命令写到dataOut缓存区中,而dataOut我们在前面分析中TcpTransport的启动中,
//TcpTransport与Broker建立套接字,dataOUt为socket的输出流,前文中我们分析过dataOut为DataOutputStream

 wireFormat.marshal(command, dataOut);


//OpenWireFormat


 public synchronized void marshal(Object o, DataOutput dataOut)
        throws IOException
    {
        if(cacheEnabled)
            runMarshallCacheEvictionSweep();
        int size = 1;
        if(o != null)
        {
	    //获取数据类型
            DataStructure c = (DataStructure)o;
            byte type = c.getDataStructureType();
            DataStreamMarshaller dsm = dataMarshallers[type & 255];
            if(dsm == null)
                throw new IOException((new StringBuilder()).append("Unknown data type: ").append(type).toString());
            if(tightEncodingEnabled)
            {
                BooleanStream bs = new BooleanStream();
                size += dsm.tightMarshal1(this, c, bs);
                size += bs.marshalledSize();
                if(!sizePrefixDisabled)
                    dataOut.writeInt(size);
                dataOut.writeByte(type);
                bs.marshal(dataOut);
                dsm.tightMarshal2(this, c, dataOut, bs);
            } else
            {
                DataOutput looseOut = dataOut;
                if(!sizePrefixDisabled)
                {
                    bytesOut.restart();
                    looseOut = bytesOut;
                }
		//将类型写到数据输出流
                looseOut.writeByte(type);
		//将消息信息写到输出流
                dsm.looseMarshal(this, c, looseOut);
                if(!sizePrefixDisabled)
                {
                    ByteSequence sequence = bytesOut.toByteSequence();
                    dataOut.writeInt(sequence.getLength());
                    dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
                }
            }
        } else
        {
            if(!sizePrefixDisabled)
                dataOut.writeInt(size);
            dataOut.writeByte(0);
        }
    }

将消息信息写到输出流
dsm.looseMarshal(this, c, looseOut);

//WireFormatInfoMarshaller
public void looseMarshal(OpenWireFormat wireFormat, Object o, DataOutput dataOut)
        throws IOException
    {
        WireFormatInfo info = (WireFormatInfo)o;
        info.beforeMarshall(wireFormat);
        super.looseMarshal(wireFormat, o, dataOut);
        looseMarshalConstByteArray(wireFormat, info.getMagic(), dataOut, 8);
        dataOut.writeInt(info.getVersion());
        looseMarshalByteSequence(wireFormat, info.getMarshalledProperties(), dataOut);
    }


做个小节:
生产者发送消息,首先转换消息,然后通过会话发送消息,会话创建事务,并通过连接发送事务信息,最后会话通过连接发送信息,连接发送信息,实际上是通过ResponseCorrelator,ResponseCorrelator设置发送消息的命令id及是否需要回复信息,然后交给MutexTransport,MutexTransport首先获取发送消息命令锁,再通过TcpTransport发送,MutexTransport最后通过OpenWireFormat发送消息。

记录消息发送事件
stats.onMessage();

//JMSEndpointStatsImpl
public class JMSEndpointStatsImpl extends StatsImpl
{
    protected CountStatisticImpl messageCount;//消息数统计
    protected CountStatisticImpl pendingMessageCount;
    protected CountStatisticImpl expiredMessageCount;//过期消息统计
    protected TimeStatisticImpl messageWaitTime;//消息等待时间统计
    protected TimeStatisticImpl messageRateTime;
  public void onMessage()
    {
        if(enabled)
        {
            long start = messageCount.getLastSampleTime();
            messageCount.increment();//增加消息数
            long end = messageCount.getLastSampleTime();
            messageRateTime.addTime(end - start);
        }
    }
}

//CountStatisticImpl
public class CountStatisticImpl extends StatisticImpl
    implements CountStatistic
{
    private final AtomicLong counter;//消息计数器
    private CountStatisticImpl parent;
    //生产者,生产消息,增加消息计数器
    public void increment()
    {
        if(isEnabled())
        {
            counter.incrementAndGet();
            updateSampleTime();
            if(parent != null)
                parent.increment();
        }
    }
 }

4.会话提交
session.commit(); 

//ActiveMQSession
public void commit()
        throws JMSException
    {
        checkClosed();
        if(!getTransacted())
            throw new IllegalStateException("Not a transacted session");
        if(LOG.isDebugEnabled())
            LOG.debug((new StringBuilder()).append(getSessionId()).append(" Transaction Commit :").append(transactionContext.getTransactionId()).toString());
        //提交事务
	transactionContext.commit();
    }

//TransactionContext
 
public void commit()
        throws JMSException
    {
        if(isInXATransaction())
            throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
        try
        {
            beforeEnd();
        }
        catch(JMSException e)
        {
            rollback();
            throw e;
        }
        if(transactionId != null)
        {
            LOG.debug("Commit: {} syncCount: {}", transactionId, Integer.valueOf(synchronizations == null ? 0 : synchronizations.size()));
            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, (byte)2);
            transactionId = null;
            try
            {
	       //发送事务提交信息
                syncSendPacketWithInterruptionHandling(info);
                if(localTransactionEventListener != null)
                    localTransactionEventListener.commitEvent();
                afterCommit();
            }
            catch(JMSException cause)
            {
                LOG.info("commit failed for transaction {}", info.getTransactionId(), cause);
                if(localTransactionEventListener != null)
                    localTransactionEventListener.rollbackEvent();
                afterRollback();
                throw cause;
            }
        }
    }



5.关闭连接
connection.close(); 


public void close()
        throws JMSException
    {
        boolean interrupted = Thread.interrupted();
        if(!closed.get() && !transportFailed.get())
	    //关闭会话
            doStop(false);
        synchronized(this)
        {
            if(!closed.get())
            {
                closing.set(true);//设置关闭状态
		//关闭目的地
                if(destinationSource != null)
                {
                    destinationSource.stop();
                    destinationSource = null;
                }
                if(advisoryConsumer != null)
                {
                    advisoryConsumer.dispose();
                    advisoryConsumer = null;
                }
		//关闭调度器
                Scheduler scheduler = this.scheduler;
                if(scheduler != null)
                    try
                    {
                        scheduler.stop();
                    }
                    catch(Exception e)
                    {
                        JMSException ex = JMSExceptionSupport.create(e);
                        throw ex;
                    }
                long lastDeliveredSequenceId = -1L;
                for(Iterator i = sessions.iterator(); i.hasNext();)
                {
                    ActiveMQSession s = (ActiveMQSession)i.next();
                    s.dispose();
                    lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
                }

                ActiveMQConnectionConsumer c;
		//移除消费者信息
                for(Iterator i = connectionConsumers.iterator(); i.hasNext(); c.dispose())
                    c = (ActiveMQConnectionConsumer)i.next();

                activeTempDestinations.clear();
                if(isConnectionInfoSentToBroker)
                {
                    RemoveInfo removeCommand = info.createRemoveCommand();
                    removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
                    try
                    {
		        //发送移除命令
                        doSyncSendPacket(removeCommand, closeTimeout);
                    }
                    catch(JMSException e)
                    {
                        if(!(e.getCause() instanceof RequestTimedOutIOException))
                            throw e;
                    }
		    //发送关闭命令
                    doAsyncSendPacket(new ShutdownInfo());
                }
                started.set(false);
                if(sessionTaskRunner != null)
                    sessionTaskRunner.shutdown();
                closed.set(true);
                closing.set(false);
            }
        }
        try
        {
            if(executor != null)
	        //关闭线程执行器
                ThreadPoolUtils.shutdown(executor);
        }
        catch(Throwable e)
        {
            LOG.warn((new StringBuilder()).append("Error shutting down thread pool: ").append(executor).append(". This exception will be ignored.").toString(), e);
        }
	//关闭transport
        ServiceSupport.dispose(transport);
	//从连接状态管理器移除连接
        factoryStats.removeConnection(this);
        if(interrupted)
            Thread.currentThread().interrupt();
        break MISSING_BLOCK_LABEL_506;
        Exception exception1;
        exception1;
        try
        {
            if(executor != null)
                ThreadPoolUtils.shutdown(executor);
        }
        catch(Throwable e)
        {
            LOG.warn((new StringBuilder()).append("Error shutting down thread pool: ").append(executor).append(". This exception will be ignored.").toString(), e);
        }
        ServiceSupport.dispose(transport);
        factoryStats.removeConnection(this);
        if(interrupted)
            Thread.currentThread().interrupt();
        throw exception1;
    }

//关闭会话
 void doStop(boolean checkClosed)
        throws JMSException
    {
        if(checkClosed)
            checkClosedOrFailed();
        if(started.compareAndSet(true, false))
            synchronized(sessions)
            {
                ActiveMQSession s;
                for(Iterator i = sessions.iterator(); i.hasNext(); s.stop())
                    s = (ActiveMQSession)i.next();

            }
    }


总结:

ActiveMQTopic和ActiveMQQueue可以看出本质都是ActiveMQDestination
只是JNDI和数据结构类型不同;会话创建消息生产者就是,初始化ActiveMQMessageProducer的生产者信息,生产者状态管理器,消息序列号,传输模式(默认持久化),消息转化器,生产者窗口,并将生产者信息通过会话发送给Server;ActiveMQTopicPublisher本质上是ActiveMQMessageProducer,TopicPublisher的消息的发送都是委托给ActiveMQMessageProducer;生产者发送消息,首先转换消息,然后通过会话发送消息,
会话创建事务,并通过连接发送事务信息,最后会话通过连接发送信息,连接发送信息,实际上是通过ResponseCorrelator,ResponseCorrelator设置发送消息的命令id及是否需要回复信息,然后交给MutexTransport,MutexTransport首先获取发送消息命令锁,再通过TcpTransport发送,MutexTransport最后通过OpenWireFormat发送消息;关闭连接就是关闭执行器,TcpTransport,调度器,移除消费者信息,向broker发送关闭命令。



//内存使用
public class MemoryUsage extends Usage
{
 public MemoryUsage(String name)
    {
        this(null, name);
    }
    //等待空间可利用
     public boolean waitForSpace(long timeout)
        throws InterruptedException
    {
        if(parent != null && !((MemoryUsage)parent).waitForSpace(timeout))
            return false;
        usageLock.readLock().lock();
        if(percentUsage < 100)
            break MISSING_BLOCK_LABEL_124;
        usageLock.readLock().unlock();
        usageLock.writeLock().lock();
        while(percentUsage >= 100) 
            waitForSpaceCondition.await(timeout, TimeUnit.MILLISECONDS);
        usageLock.readLock().lock();
        usageLock.writeLock().unlock();
        break MISSING_BLOCK_LABEL_124;
        Exception exception;
        exception;
        usageLock.writeLock().unlock();
        throw exception;
        boolean flag = percentUsage < 100;
        usageLock.readLock().unlock();
        return flag;
        Exception exception1;
        exception1;
        usageLock.readLock().unlock();
        throw exception1;
    }
   //内存是否满
    public boolean isFull()
    {
        if(parent != null && ((MemoryUsage)parent).isFull())
            return true;
        usageLock.readLock().lock();
        boolean flag = percentUsage >= 100;
        usageLock.readLock().unlock();
        return flag;
        Exception exception;
        exception;
        usageLock.readLock().unlock();
        throw exception;
    }
    //增加内存使用量
    public void increaseUsage(long value)
    {
        if(value == 0L)
            return;
        usageLock.writeLock().lock();
        usage += value;
        setPercentUsage(caclPercentUsage());
        usageLock.writeLock().unlock();
        break MISSING_BLOCK_LABEL_61;
        Exception exception;
        exception;
        usageLock.writeLock().unlock();
        throw exception;
        if(parent != null)
            ((MemoryUsage)parent).increaseUsage(value);
        return;
    }
    //减少内存使用量
    public void decreaseUsage(long value)
    {
        if(value == 0L)
            return;
        usageLock.writeLock().lock();
        usage -= value;
        setPercentUsage(caclPercentUsage());
        usageLock.writeLock().unlock();
        break MISSING_BLOCK_LABEL_61;
        Exception exception;
        exception;
        usageLock.writeLock().unlock();
        throw exception;
        if(parent != null)
            ((MemoryUsage)parent).decreaseUsage(value);
        return;
    }
}

public abstract class Usage
    implements Service
{
    //内存使用锁
    protected final ReentrantReadWriteLock usageLock = new ReentrantReadWriteLock();
    protected final Condition waitForSpaceCondition;//等待条件
    protected int percentUsage;//内存使用百分比
    protected Usage parent;
    protected String name;
    private UsageCapacity limiter;
    private int percentUsageMinDelta;
    private final List listeners = new CopyOnWriteArrayList();
    private final boolean debug;
    private float usagePortion;
    private final List children = new CopyOnWriteArrayList();
    private final List callbacks = new LinkedList();
    private int pollingTime;
    private final AtomicBoolean started = new AtomicBoolean();
    private ThreadPoolExecutor executor;
}


//消息,消息目的转化器
public final class ActiveMQMessageTransformation
{
    //转化目的地
    public static ActiveMQDestination transformDestination(Destination destination)
        throws JMSException
    {
        ActiveMQDestination activeMQDestination = null;
        if(destination != null)
        {
            if(destination instanceof ActiveMQDestination)
                return (ActiveMQDestination)destination;
            if(destination instanceof TemporaryQueue)
                activeMQDestination = new ActiveMQTempQueue(((Queue)destination).getQueueName());
            else
            if(destination instanceof TemporaryTopic)
                activeMQDestination = new ActiveMQTempTopic(((Topic)destination).getTopicName());
            else
            if(destination instanceof Queue)
                activeMQDestination = new ActiveMQQueue(((Queue)destination).getQueueName());
            else
            if(destination instanceof Topic)
                activeMQDestination = new ActiveMQTopic(((Topic)destination).getTopicName());
        }
        return activeMQDestination;
    }
   //转换消息
    public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection)
        throws JMSException
    {
        if(message instanceof ActiveMQMessage)
            return (ActiveMQMessage)message;
        ActiveMQMessage activeMessage = null;
        if(message instanceof BytesMessage)
        {
            BytesMessage bytesMsg = (BytesMessage)message;
            bytesMsg.reset();
            ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
            msg.setConnection(connection);
            try
            {
                do
                    msg.writeByte(bytesMsg.readByte());
                while(true);
            }
            catch(MessageEOFException messageeofexception) { }
            catch(JMSException jmsexception) { }
            activeMessage = msg;
        } else
        if(message instanceof MapMessage)
        {
            MapMessage mapMsg = (MapMessage)message;
            ActiveMQMapMessage msg = new ActiveMQMapMessage();
            msg.setConnection(connection);
            String name;
            for(Enumeration iter = mapMsg.getMapNames(); iter.hasMoreElements(); msg.setObject(name, mapMsg.getObject(name)))
                name = iter.nextElement().toString();

            activeMessage = msg;
        } else
        if(message instanceof ObjectMessage)
        {
            ObjectMessage objMsg = (ObjectMessage)message;
            ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
            msg.setConnection(connection);
            msg.setObject(objMsg.getObject());
            msg.storeContent();
            activeMessage = msg;
        } else
        if(message instanceof StreamMessage)
        {
            StreamMessage streamMessage = (StreamMessage)message;
            streamMessage.reset();
            ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
            msg.setConnection(connection);
            Object obj = null;
            try
            {
                while((obj = streamMessage.readObject()) != null) 
                    msg.writeObject(obj);
            }
            catch(MessageEOFException messageeofexception1) { }
            catch(JMSException jmsexception1) { }
            activeMessage = msg;
        } else
        if(message instanceof TextMessage)
        {
            TextMessage textMsg = (TextMessage)message;
            ActiveMQTextMessage msg = new ActiveMQTextMessage();
            msg.setConnection(connection);
            msg.setText(textMsg.getText());
            activeMessage = msg;
        } else
        if(message instanceof BlobMessage)
        {
            BlobMessage blobMessage = (BlobMessage)message;
            ActiveMQBlobMessage msg = new ActiveMQBlobMessage();
            msg.setConnection(connection);
            if(connection != null)
                msg.setBlobDownloader(new BlobDownloader(connection.getBlobTransferPolicy()));
            try
            {
                msg.setURL(blobMessage.getURL());
            }
            catch(MalformedURLException malformedurlexception) { }
            activeMessage = msg;
        } else
        {
            activeMessage = new ActiveMQMessage();
            activeMessage.setConnection(connection);
        }
        copyProperties(message, activeMessage);
        return activeMessage;
    }

    public static void copyProperties(Message fromMessage, Message toMessage)
        throws JMSException
    {
        toMessage.setJMSMessageID(fromMessage.getJMSMessageID());
        toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
        toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
        toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
        toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
        toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered());
        toMessage.setJMSType(fromMessage.getJMSType());
        toMessage.setJMSExpiration(fromMessage.getJMSExpiration());
        toMessage.setJMSPriority(fromMessage.getJMSPriority());
        toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp());
        String name;
        Object obj;
        for(Enumeration propertyNames = fromMessage.getPropertyNames(); propertyNames.hasMoreElements(); toMessage.setObjectProperty(name, obj))
        {
            name = propertyNames.nextElement().toString();
            obj = fromMessage.getObjectProperty(name);
        }

    }
}




public interface DeliveryMode
{
    public static final int NON_PERSISTENT = 1;
    public static final int PERSISTENT = 2;
}







package java.io;

import java.io.ObjectOutput;
import java.io.ObjectInput;

/**
 * Only the identity of the class of an Externalizable instance is
 * written in the serialization stream and it is the responsibility
 * of the class to save and restore the contents of its instances.
 *
 * The writeExternal and readExternal methods of the Externalizable
 * interface are implemented by a class to give the class complete
 * control over the format and contents of the stream for an object
 * and its supertypes. These methods must explicitly
 * coordinate with the supertype to save its state. These methods supersede
 * customized implementations of writeObject and readObject methods.<br>
 *
 * Object Serialization uses the Serializable and Externalizable
 * interfaces.  Object persistence mechanisms can use them as well.  Each
 * object to be stored is tested for the Externalizable interface. If
 * the object supports Externalizable, the writeExternal method is called. If the
 * object does not support Externalizable and does implement
 * Serializable, the object is saved using
 * ObjectOutputStream. <br> When an Externalizable object is
 * reconstructed, an instance is created using the public no-arg
 * constructor, then the readExternal method called.  Serializable
 * objects are restored by reading them from an ObjectInputStream.<br>
 *
 * An Externalizable instance can designate a substitution object via
 * the writeReplace and readResolve methods documented in the Serializable
 * interface.<br>
 *
 * @author  unascribed
 * @see java.io.ObjectOutputStream
 * @see java.io.ObjectInputStream
 * @see java.io.ObjectOutput
 * @see java.io.ObjectInput
 * @see java.io.Serializable
 * @since   JDK1.1
 */
public interface Externalizable extends java.io.Serializable {
    /**
     * The object implements the writeExternal method to save its contents
     * by calling the methods of DataOutput for its primitive values or
     * calling the writeObject method of ObjectOutput for objects, strings,
     * and arrays.
     *
     * @serialData Overriding methods should use this tag to describe
     *             the data layout of this Externalizable object.
     *             List the sequence of element types and, if possible,
     *             relate the element to a public/protected field and/or
     *             method of this Externalizable class.
     *
     * @param out the stream to write the object to
     * @exception IOException Includes any I/O exceptions that may occur
     */
    void writeExternal(ObjectOutput out) throws IOException;

    /**
     * The object implements the readExternal method to restore its
     * contents by calling the methods of DataInput for primitive
     * types and readObject for objects, strings and arrays.  The
     * readExternal method must read the values in the same sequence
     * and with the same types as were written by writeExternal.
     *
     * @param in the stream to read data from in order to restore the object
     * @exception IOException if I/O errors occur
     * @exception ClassNotFoundException If the class for an object being
     *              restored cannot be found.
     */
    void readExternal(ObjectInput in) throws IOException, ClassNotFoundException;
}


package java.io;

/**
 * ObjectOutput extends the DataOutput interface to include writing of objects.
 * DataOutput includes methods for output of primitive types, ObjectOutput
 * extends that interface to include objects, arrays, and Strings.
 *
 * @author  unascribed
 * @see java.io.InputStream
 * @see java.io.ObjectOutputStream
 * @see java.io.ObjectInputStream
 * @since   JDK1.1
 */
public interface ObjectOutput extends DataOutput, AutoCloseable {
    /**
     * Write an object to the underlying storage or stream.  The
     * class that implements this interface defines how the object is
     * written.
     *
     * @param obj the object to be written
     * @exception IOException Any of the usual Input/Output related exceptions.
     */
    public void writeObject(Object obj)
      throws IOException;

    /**
     * Writes a byte. This method will block until the byte is actually
     * written.
     * @param b the byte
     * @exception IOException If an I/O error has occurred.
     */
    public void write(int b) throws IOException;

    /**
     * Writes an array of bytes. This method will block until the bytes
     * are actually written.
     * @param b the data to be written
     * @exception IOException If an I/O error has occurred.
     */
    public void write(byte b[]) throws IOException;

    /**
     * Writes a sub array of bytes.
     * @param b the data to be written
     * @param off       the start offset in the data
     * @param len       the number of bytes that are written
     * @exception IOException If an I/O error has occurred.
     */
    public void write(byte b[], int off, int len) throws IOException;

    /**
     * Flushes the stream. This will write any buffered
     * output bytes.
     * @exception IOException If an I/O error has occurred.
     */
    public void flush() throws IOException;

    /**
     * Closes the stream. This method must be called
     * to release any resources associated with the
     * stream.
     * @exception IOException If an I/O error has occurred.
     */
    public void close() throws IOException;
}

0
2
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics