Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq;
   18   
   19   import java.net.MalformedURLException;
   20   import java.util.Enumeration;
   21   
   22   import javax.jms.BytesMessage;
   23   import javax.jms.Destination;
   24   import javax.jms.JMSException;
   25   import javax.jms.MapMessage;
   26   import javax.jms.Message;
   27   import javax.jms.MessageEOFException;
   28   import javax.jms.ObjectMessage;
   29   import javax.jms.Queue;
   30   import javax.jms.StreamMessage;
   31   import javax.jms.TemporaryQueue;
   32   import javax.jms.TemporaryTopic;
   33   import javax.jms.TextMessage;
   34   import javax.jms.Topic;
   35   
   36   import org.apache.activemq.blob.BlobDownloader;
   37   import org.apache.activemq.blob.BlobUploader;
   38   import org.apache.activemq.command.ActiveMQBlobMessage;
   39   import org.apache.activemq.command.ActiveMQBytesMessage;
   40   import org.apache.activemq.command.ActiveMQDestination;
   41   import org.apache.activemq.command.ActiveMQMapMessage;
   42   import org.apache.activemq.command.ActiveMQMessage;
   43   import org.apache.activemq.command.ActiveMQObjectMessage;
   44   import org.apache.activemq.command.ActiveMQQueue;
   45   import org.apache.activemq.command.ActiveMQStreamMessage;
   46   import org.apache.activemq.command.ActiveMQTempQueue;
   47   import org.apache.activemq.command.ActiveMQTempTopic;
   48   import org.apache.activemq.command.ActiveMQTextMessage;
   49   import org.apache.activemq.command.ActiveMQTopic;
   50   
   51   /**
   52    * A helper class for converting normal JMS interfaces into ActiveMQ specific
   53    * ones.
   54    * 
   55    * @version $Revision: 1.1 $
   56    */
   57   public final class ActiveMQMessageTransformation {
   58   
   59       private ActiveMQMessageTransformation() {    
   60       }
   61       
   62       /**
   63        * Creates a an available JMS message from another provider.
   64        * 
   65        * @param destination - Destination to be converted into ActiveMQ's
   66        *                implementation.
   67        * @return ActiveMQDestination - ActiveMQ's implementation of the
   68        *         destination.
   69        * @throws JMSException if an error occurs
   70        */
   71       public static ActiveMQDestination transformDestination(Destination destination) throws JMSException {
   72           ActiveMQDestination activeMQDestination = null;
   73   
   74           if (destination != null) {
   75               if (destination instanceof ActiveMQDestination) {
   76                   return (ActiveMQDestination)destination;
   77   
   78               } else {
   79                   if (destination instanceof TemporaryQueue) {
   80                       activeMQDestination = new ActiveMQTempQueue(((Queue)destination).getQueueName());
   81                   } else if (destination instanceof TemporaryTopic) {
   82                       activeMQDestination = new ActiveMQTempTopic(((Topic)destination).getTopicName());
   83                   } else if (destination instanceof Queue) {
   84                       activeMQDestination = new ActiveMQQueue(((Queue)destination).getQueueName());
   85                   } else if (destination instanceof Topic) {
   86                       activeMQDestination = new ActiveMQTopic(((Topic)destination).getTopicName());
   87                   }
   88               }
   89           }
   90   
   91           return activeMQDestination;
   92       }
   93   
   94       /**
   95        * Creates a fast shallow copy of the current ActiveMQMessage or creates a
   96        * whole new message instance from an available JMS message from another
   97        * provider.
   98        * 
   99        * @param message - Message to be converted into ActiveMQ's implementation.
  100        * @param connection
  101        * @return ActiveMQMessage - ActiveMQ's implementation object of the
  102        *         message.
  103        * @throws JMSException if an error occurs
  104        */
  105       public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection)
  106           throws JMSException {
  107           if (message instanceof ActiveMQMessage) {
  108               return (ActiveMQMessage)message;
  109   
  110           } else {
  111               ActiveMQMessage activeMessage = null;
  112   
  113               if (message instanceof BytesMessage) {
  114                   BytesMessage bytesMsg = (BytesMessage)message;
  115                   bytesMsg.reset();
  116                   ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
  117                   msg.setConnection(connection);
  118                   try {
  119                       for (;;) {
  120                           // Reads a byte from the message stream until the stream
  121                           // is empty
  122                           msg.writeByte(bytesMsg.readByte());
  123                       }
  124                   } catch (MessageEOFException e) {
  125                       // if an end of message stream as expected
  126                   } catch (JMSException e) {
  127                   }
  128   
  129                   activeMessage = msg;
  130               } else if (message instanceof MapMessage) {
  131                   MapMessage mapMsg = (MapMessage)message;
  132                   ActiveMQMapMessage msg = new ActiveMQMapMessage();
  133                   msg.setConnection(connection);
  134                   Enumeration iter = mapMsg.getMapNames();
  135   
  136                   while (iter.hasMoreElements()) {
  137                       String name = iter.nextElement().toString();
  138                       msg.setObject(name, mapMsg.getObject(name));
  139                   }
  140   
  141                   activeMessage = msg;
  142               } else if (message instanceof ObjectMessage) {
  143                   ObjectMessage objMsg = (ObjectMessage)message;
  144                   ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
  145                   msg.setConnection(connection);
  146                   msg.setObject(objMsg.getObject());
  147                   msg.storeContent();
  148                   activeMessage = msg;
  149               } else if (message instanceof StreamMessage) {
  150                   StreamMessage streamMessage = (StreamMessage)message;
  151                   streamMessage.reset();
  152                   ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
  153                   msg.setConnection(connection);
  154                   Object obj = null;
  155   
  156                   try {
  157                       while ((obj = streamMessage.readObject()) != null) {
  158                           msg.writeObject(obj);
  159                       }
  160                   } catch (MessageEOFException e) {
  161                       // if an end of message stream as expected
  162                   } catch (JMSException e) {
  163                   }
  164   
  165                   activeMessage = msg;
  166               } else if (message instanceof TextMessage) {
  167                   TextMessage textMsg = (TextMessage)message;
  168                   ActiveMQTextMessage msg = new ActiveMQTextMessage();
  169                   msg.setConnection(connection);
  170                   msg.setText(textMsg.getText());
  171                   activeMessage = msg;
  172               } else if (message instanceof BlobMessage) {
  173               	BlobMessage blobMessage = (BlobMessage)message;
  174               	ActiveMQBlobMessage msg = new ActiveMQBlobMessage();
  175               	msg.setConnection(connection);
  176               	msg.setBlobDownloader(new BlobDownloader(connection.getBlobTransferPolicy()));
  177               	try {
  178   					msg.setURL(blobMessage.getURL());
  179   				} catch (MalformedURLException e) {
  180   					
  181   				}
  182               	activeMessage = msg;
  183               } else {
  184                   activeMessage = new ActiveMQMessage();
  185                   activeMessage.setConnection(connection);
  186               }
  187   
  188               copyProperties(message, activeMessage);
  189   
  190               return activeMessage;
  191           }
  192       }
  193   
  194       /**
  195        * Copies the standard JMS and user defined properties from the givem
  196        * message to the specified message
  197        * 
  198        * @param fromMessage the message to take the properties from
  199        * @param toMessage the message to add the properties to
  200        * @throws JMSException
  201        */
  202       public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException {
  203           toMessage.setJMSMessageID(fromMessage.getJMSMessageID());
  204           toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
  205           toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
  206           toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
  207           toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
  208           toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered());
  209           toMessage.setJMSType(fromMessage.getJMSType());
  210           toMessage.setJMSExpiration(fromMessage.getJMSExpiration());
  211           toMessage.setJMSPriority(fromMessage.getJMSPriority());
  212           toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp());
  213   
  214           Enumeration propertyNames = fromMessage.getPropertyNames();
  215   
  216           while (propertyNames.hasMoreElements()) {
  217               String name = propertyNames.nextElement().toString();
  218               Object obj = fromMessage.getObjectProperty(name);
  219               toMessage.setObjectProperty(name, obj);
  220           }
  221       }
  222   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]