Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » systest » impl » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.systest.impl;
   18   
   19   import java.util.concurrent.CopyOnWriteArrayList;
   20   
   21   import javax.jms.Message;
   22   import javax.jms.MessageListener;
   23   
   24   import java.util.ArrayList;
   25   import java.util.List;
   26   
   27   /**
   28    * A simple consumer which is useful for testing which can be used to wait until
   29    * the consumer has received a specific number of messages.
   30    * 
   31    * @author Mike Perham
   32    * @version $Revision$
   33    */
   34   public class AgentMessageListener implements MessageListener {
   35       private List messages = new CopyOnWriteArrayList();
   36       private Object semaphore = new Object();;
   37   
   38       public  void stop() {
   39           messages.clear();
   40       }
   41   
   42       /**
   43        * @return all the messages on the list so far, clearing the buffer
   44        */
   45       public List flushMessages() {
   46           List answer = new ArrayList(messages);
   47           messages.clear();
   48           return answer;
   49       }
   50   
   51       public void onMessage(Message message) {
   52           System.out.println("Received message: " + message);
   53   
   54               messages.add(message);
   55   
   56           synchronized (semaphore) {
   57               semaphore.notifyAll();
   58           }
   59       }
   60   
   61       public void waitForMessageToArrive() {
   62           System.out.println("Waiting for message to arrive");
   63   
   64           long start = System.currentTimeMillis();
   65   
   66           try {
   67               if (hasReceivedMessage()) {
   68                   synchronized (semaphore) {
   69                       semaphore.wait(4000);
   70                   }
   71               }
   72           }
   73           catch (InterruptedException e) {
   74               System.out.println("Caught: " + e);
   75           }
   76           long end = System.currentTimeMillis() - start;
   77   
   78           System.out.println("End of wait for " + end + " millis");
   79       }
   80   
   81       public void waitForMessagesToArrive(int messageCount) {
   82           System.out.println("Waiting for message to arrive");
   83   
   84           long start = System.currentTimeMillis();
   85   
   86           for (int i = 0; i < 10; i++) {
   87               try {
   88                   if (hasReceivedMessages(messageCount)) {
   89                       break;
   90                   }
   91                   synchronized (semaphore) {
   92                       semaphore.wait(1000);
   93                   }
   94               }
   95               catch (InterruptedException e) {
   96                   System.out.println("Caught: " + e);
   97               }
   98           }
   99           long end = System.currentTimeMillis() - start;
  100   
  101           System.out.println("End of wait for " + end + " millis");
  102       }
  103   
  104       protected boolean hasReceivedMessage() {
  105           return messages.isEmpty();
  106       }
  107   
  108       protected boolean hasReceivedMessages(int messageCount) {
  109           return messages.size() >= messageCount;
  110       }
  111   
  112   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » systest » impl » [javadoc | source]