Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region;
   18   
   19   import java.io.IOException;
   20   import javax.jms.InvalidSelectorException;
   21   import org.apache.activemq.broker.Broker;
   22   import org.apache.activemq.broker.ConnectionContext;
   23   import org.apache.activemq.command.ConsumerInfo;
   24   import org.apache.activemq.command.MessageAck;
   25   import org.apache.activemq.filter.MessageEvaluationContext;
   26   import org.apache.activemq.usage.SystemUsage;
   27   
   28   public class QueueBrowserSubscription extends QueueSubscription {
   29   
   30       int queueRefs;
   31       boolean browseDone;
   32       boolean destinationsAdded;
   33   
   34       public QueueBrowserSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
   35           throws InvalidSelectorException {
   36           super(broker,usageManager, context, info);
   37       }
   38   
   39       protected boolean canDispatch(MessageReference node) {
   40           return !((QueueMessageReference)node).isAcked();
   41       }
   42   
   43       public synchronized String toString() {
   44           return "QueueBrowserSubscription:" + " consumer=" + info.getConsumerId() + ", destinations="
   45                  + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered="
   46                  + this.prefetchExtension + ", pending=" + getPendingQueueSize();
   47       }
   48   
   49       synchronized public void destinationsAdded() throws Exception {
   50           destinationsAdded = true;
   51           checkDone();
   52       }
   53   
   54       private void checkDone() throws Exception {
   55           if( !browseDone && queueRefs == 0 && destinationsAdded) {
   56               browseDone=true;
   57               add(QueueMessageReference.NULL_MESSAGE);
   58           }
   59       }
   60   
   61       public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
   62           return !browseDone && super.matches(node, context);
   63       }
   64   
   65       /**
   66        * Since we are a browser we don't really remove the message from the queue.
   67        */
   68       protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n)
   69           throws IOException {
   70       	if (info.isNetworkSubscription()) {
   71       		super.acknowledge(context, ack, n);
   72       	}
   73       }
   74   
   75       synchronized public void incrementQueueRef() {
   76           queueRefs++;        
   77       }
   78   
   79       synchronized public void decrementQueueRef() throws Exception {
   80           queueRefs--;
   81           checkDone();
   82       }
   83   
   84   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]