Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq;
   18   
   19   import java.util.Enumeration;
   20   import java.util.concurrent.atomic.AtomicBoolean;
   21   
   22   import javax.jms.IllegalStateException;
   23   import javax.jms.JMSException;
   24   import javax.jms.Message;
   25   import javax.jms.Queue;
   26   import javax.jms.QueueBrowser;
   27   
   28   import org.apache.activemq.command.ActiveMQDestination;
   29   import org.apache.activemq.command.ConsumerId;
   30   import org.apache.activemq.command.MessageDispatch;
   31   
   32   /**
   33    * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a
   34    * queue without removing them. <p/>
   35    * <P>
   36    * The <CODE>getEnumeration</CODE> method returns a <CODE>
   37    * java.util.Enumeration</CODE>
   38    * that is used to scan the queue's messages. It may be an enumeration of the
   39    * entire content of a queue, or it may contain only the messages matching a
   40    * message selector. <p/>
   41    * <P>
   42    * Messages may be arriving and expiring while the scan is done. The JMS API
   43    * does not require the content of an enumeration to be a static snapshot of
   44    * queue content. Whether these changes are visible or not depends on the JMS
   45    * provider. <p/>
   46    * <P>
   47    * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session
   48    * </CODE>
   49    * or a <CODE>QueueSession</CODE>.
   50    * 
   51    * @see javax.jms.Session#createBrowser
   52    * @see javax.jms.QueueSession#createBrowser
   53    * @see javax.jms.QueueBrowser
   54    * @see javax.jms.QueueReceiver
   55    */
   56   
   57   public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
   58   
   59       private final ActiveMQSession session;
   60       private final ActiveMQDestination destination;
   61       private final String selector;
   62   
   63       private ActiveMQMessageConsumer consumer;
   64       private boolean closed;
   65       private final ConsumerId consumerId;
   66       private final AtomicBoolean browseDone = new AtomicBoolean(true);
   67       private final boolean dispatchAsync;
   68       private Object semaphore = new Object();
   69   
   70       /**
   71        * Constructor for an ActiveMQQueueBrowser - used internally
   72        * 
   73        * @param theSession
   74        * @param dest
   75        * @param selector
   76        * @throws JMSException
   77        */
   78       protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException {
   79           this.session = session;
   80           this.consumerId = consumerId;
   81           this.destination = destination;
   82           this.selector = selector;
   83           this.dispatchAsync = dispatchAsync;
   84           this.consumer = createConsumer();
   85       }
   86   
   87       /**
   88        * @param session
   89        * @param originalDestination
   90        * @param selectorExpression
   91        * @param cnum
   92        * @return
   93        * @throws JMSException
   94        */
   95       private ActiveMQMessageConsumer createConsumer() throws JMSException {
   96           browseDone.set(false);
   97           ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy();
   98           return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy
   99               .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) {
  100               public void dispatch(MessageDispatch md) {
  101                   if (md.getMessage() == null) {
  102                       browseDone.set(true);
  103                   } else {
  104                       super.dispatch(md);
  105                   }
  106                   notifyMessageAvailable();
  107               }
  108           };
  109       }
  110   
  111       private void destroyConsumer() {
  112           if (consumer == null) {
  113               return;
  114           }
  115           try {
  116               if (session.getTransacted()) {
  117                   session.commit();
  118               }
  119               consumer.close();
  120               consumer = null;
  121           } catch (JMSException e) {
  122               e.printStackTrace();
  123           }
  124       }
  125   
  126       /**
  127        * Gets an enumeration for browsing the current queue messages in the order
  128        * they would be received.
  129        * 
  130        * @return an enumeration for browsing the messages
  131        * @throws JMSException if the JMS provider fails to get the enumeration for
  132        *                 this browser due to some internal error.
  133        */
  134   
  135       public Enumeration getEnumeration() throws JMSException {
  136           checkClosed();
  137           if (consumer == null) {
  138               consumer = createConsumer();
  139           }
  140           return this;
  141       }
  142   
  143       private void checkClosed() throws IllegalStateException {
  144           if (closed) {
  145               throw new IllegalStateException("The Consumer is closed");
  146           }
  147       }
  148   
  149       /**
  150        * @return true if more messages to process
  151        */
  152       public boolean hasMoreElements() {
  153           while (true) {
  154   
  155               synchronized (this) {
  156                   if (consumer == null) {
  157                       return false;
  158                   }
  159               }
  160   
  161               if (consumer.getMessageSize() > 0) {
  162                   return true;
  163               }
  164   
  165               if (browseDone.get() || !session.isRunning()) {
  166                   destroyConsumer();
  167                   return false;
  168               }
  169   
  170               waitForMessage();
  171           }
  172       }
  173   
  174       /**
  175        * @return the next message
  176        */
  177       public Object nextElement() {
  178           while (true) {
  179   
  180               synchronized (this) {
  181                   if (consumer == null) {
  182                       return null;
  183                   }
  184               }
  185   
  186               try {
  187                   Message answer = consumer.receiveNoWait();
  188                   if (answer != null) {
  189                       return answer;
  190                   }
  191               } catch (JMSException e) {
  192                   this.session.connection.onClientInternalException(e);
  193                   return null;
  194               }
  195   
  196               if (browseDone.get() || !session.isRunning()) {
  197                   destroyConsumer();
  198                   return null;
  199               }
  200   
  201               waitForMessage();
  202           }
  203       }
  204   
  205       public synchronized void close() throws JMSException {
  206           destroyConsumer();
  207           closed = true;
  208       }
  209   
  210       /**
  211        * Gets the queue associated with this queue browser.
  212        * 
  213        * @return the queue
  214        * @throws JMSException if the JMS provider fails to get the queue
  215        *                 associated with this browser due to some internal error.
  216        */
  217   
  218       public Queue getQueue() throws JMSException {
  219           return (Queue)destination;
  220       }
  221   
  222       public String getMessageSelector() throws JMSException {
  223           return selector;
  224       }
  225   
  226       // Implementation methods
  227       // -------------------------------------------------------------------------
  228   
  229       /**
  230        * Wait on a semaphore for a fixed amount of time for a message to come in.
  231        */
  232       protected void waitForMessage() {
  233           try {
  234               synchronized (semaphore) {
  235                   semaphore.wait(2000);
  236               }
  237           } catch (InterruptedException e) {
  238               Thread.currentThread().interrupt();
  239           }
  240       }
  241   
  242       protected void notifyMessageAvailable() {
  243           synchronized (semaphore) {
  244               semaphore.notifyAll();
  245           }
  246       }
  247   
  248       public String toString() {
  249           return "ActiveMQQueueBrowser { value=" + consumerId + " }";
  250       }
  251   
  252   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]