Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » util » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.util;
   18   
   19   import java.io.IOException;
   20   
   21   import javax.jms.Destination;
   22   import javax.jms.JMSException;
   23   import javax.jms.Message;
   24   import javax.jms.MessageListener;
   25   import javax.jms.MessageProducer;
   26   import javax.jms.Session;
   27   import javax.jms.TextMessage;
   28   
   29   import org.apache.activemq.command.ActiveMQTextMessage;
   30   import org.apache.activemq.util.FactoryFinder;
   31   import org.apache.commons.logging.Log;
   32   import org.apache.commons.logging.LogFactory;
   33   
   34   /**
   35    * @version $Revision: $
   36    */
   37   public class CommandMessageListener implements MessageListener {
   38       private static final Log LOG = LogFactory.getLog(CommandMessageListener.class);
   39   
   40       private Session session;
   41       private MessageProducer producer;
   42       private CommandHandler handler;
   43   
   44       public CommandMessageListener(Session session) {
   45           this.session = session;
   46       }
   47   
   48       public void onMessage(Message message) {
   49           if (LOG.isDebugEnabled()) {
   50               LOG.debug("Received command: " + message);
   51           }
   52           if (message instanceof TextMessage) {
   53               TextMessage request = (TextMessage)message;
   54               try {
   55                   Destination replyTo = message.getJMSReplyTo();
   56                   if (replyTo == null) {
   57                       LOG.warn("Ignored message as no JMSReplyTo set: " + message);
   58                       return;
   59                   }
   60                   Message response = processCommand(request);
   61                   addReplyHeaders(request, response);
   62                   getProducer().send(replyTo, response);
   63               } catch (Exception e) {
   64                   LOG.error("Failed to process message due to: " + e + ". Message: " + message, e);
   65               }
   66           } else {
   67               LOG.warn("Ignoring invalid message: " + message);
   68           }
   69       }
   70   
   71       protected void addReplyHeaders(TextMessage request, Message response) throws JMSException {
   72           String correlationID = request.getJMSCorrelationID();
   73           if (correlationID != null) {
   74               response.setJMSCorrelationID(correlationID);
   75           }
   76       }
   77   
   78       /**
   79        * Processes an incoming JMS message returning the response message
   80        */
   81       public Message processCommand(TextMessage request) throws Exception {
   82           TextMessage response = session.createTextMessage();
   83           getHandler().processCommand(request, response);
   84           return response;
   85       }
   86   
   87       /**
   88        * Processes an incoming command from a console and returning the text to
   89        * output
   90        */
   91       public String processCommandText(String line) throws Exception {
   92           TextMessage request = new ActiveMQTextMessage();
   93           request.setText(line);
   94           TextMessage response = new ActiveMQTextMessage();
   95           getHandler().processCommand(request, response);
   96           return response.getText();
   97       }
   98   
   99       public Session getSession() {
  100           return session;
  101       }
  102   
  103       public MessageProducer getProducer() throws JMSException {
  104           if (producer == null) {
  105               producer = getSession().createProducer(null);
  106           }
  107           return producer;
  108       }
  109   
  110       public CommandHandler getHandler() throws IllegalAccessException, IOException, InstantiationException, ClassNotFoundException {
  111           if (handler == null) {
  112               handler = createHandler();
  113           }
  114           return handler;
  115       }
  116   
  117       private CommandHandler createHandler() throws IllegalAccessException, IOException, ClassNotFoundException, InstantiationException {
  118           FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/broker/");
  119           return (CommandHandler)factoryFinder.newInstance("agent");
  120       }
  121   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » util » [javadoc | source]