Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » systest » impl » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.systest.impl;
   18   
   19   import org.apache.activemq.systest.AgentStopper;
   20   import org.apache.activemq.systest.ConsumerAgent;
   21   import org.apache.activemq.systest.MessageList;
   22   
   23   import javax.jms.JMSException;
   24   import javax.jms.MessageConsumer;
   25   import javax.jms.Topic;
   26   
   27   /**
   28    * A simple in JVM implementation of a {@link ConsumerAgent}
   29    * 
   30    * @version $Revision: 1.1 $
   31    */
   32   public class ConsumerAgentImpl extends JmsClientSupport implements ConsumerAgent {
   33   
   34       private String selector;
   35       private String durableSubscriber;
   36       private boolean noLocal;
   37       private MessageConsumer consumer;
   38       private AgentMessageListener listener;
   39   
   40       public void start() throws Exception {
   41           listener = new AgentMessageListener();
   42           getConsumer().setMessageListener(listener);
   43           super.start();
   44       }
   45   
   46       public void assertConsumed(MessageList messageList) throws JMSException {
   47           int size = messageList.getSize();
   48           listener.waitForMessagesToArrive(size);
   49   
   50           // now we've received them, lets check that they are identical
   51           messageList.assertMessagesCorrect(listener.flushMessages());
   52   
   53           System.out.println("Consumer received all: " + size + " message(s)");
   54       }
   55   
   56       public void waitUntilConsumed(MessageList messageList, int percentOfList) {
   57           int size = messageList.getSize();
   58           int limit = (size * percentOfList) / 100;
   59           listener.waitForMessagesToArrive(limit);
   60       }
   61   
   62       // Properties
   63       // -------------------------------------------------------------------------
   64       public MessageConsumer getConsumer() throws JMSException {
   65           if (consumer == null) {
   66               consumer = createConsumer();
   67           }
   68           return consumer;
   69       }
   70   
   71       public void setConsumer(MessageConsumer consumer) {
   72           this.consumer = consumer;
   73       }
   74   
   75       public String getDurableSubscriber() {
   76           return durableSubscriber;
   77       }
   78   
   79       public void setDurableSubscriber(String durableSubscriber) {
   80           this.durableSubscriber = durableSubscriber;
   81       }
   82   
   83       public boolean isNoLocal() {
   84           return noLocal;
   85       }
   86   
   87       public void setNoLocal(boolean noLocal) {
   88           this.noLocal = noLocal;
   89       }
   90   
   91       public String getSelector() {
   92           return selector;
   93       }
   94   
   95       public void setSelector(String selector) {
   96           this.selector = selector;
   97       }
   98   
   99       public void stop(AgentStopper stopper) {
  100           if (listener != null) {
  101               listener.stop();
  102               listener = null;
  103           }
  104   
  105           if (consumer != null) {
  106               try {
  107                   consumer.close();
  108               }
  109               catch (JMSException e) {
  110                   stopper.onException(this, e);
  111               }
  112               finally {
  113                   consumer = null;
  114               }
  115           }
  116           super.stop(stopper);
  117       }
  118   
  119       // Implementation methods
  120       // -------------------------------------------------------------------------
  121       protected MessageConsumer createConsumer() throws JMSException {
  122           if (durableSubscriber != null) {
  123               if (selector != null) {
  124                   return getSession().createDurableSubscriber((Topic) getDestination(), durableSubscriber, selector, noLocal);
  125               }
  126               else {
  127                   return getSession().createDurableSubscriber((Topic) getDestination(), durableSubscriber);
  128               }
  129           }
  130           else {
  131               if (selector != null) {
  132                   if (noLocal) {
  133                       return getSession().createConsumer(getDestination(), selector, noLocal);
  134                   }
  135                   else {
  136                       return getSession().createConsumer(getDestination(), selector);
  137                   }
  138               }
  139               else {
  140                   return getSession().createConsumer(getDestination());
  141               }
  142           }
  143   
  144       }
  145   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » systest » impl » [javadoc | source]