Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region;
   18   
   19   import java.io.IOException;
   20   import java.util.Collections;
   21   import java.util.List;
   22   import java.util.concurrent.CopyOnWriteArrayList;
   23   import javax.jms.InvalidSelectorException;
   24   import javax.jms.JMSException;
   25   import javax.management.ObjectName;
   26   import org.apache.activemq.broker.Broker;
   27   import org.apache.activemq.broker.ConnectionContext;
   28   import org.apache.activemq.command.ActiveMQDestination;
   29   import org.apache.activemq.command.ConsumerId;
   30   import org.apache.activemq.command.ConsumerInfo;
   31   import org.apache.activemq.filter.BooleanExpression;
   32   import org.apache.activemq.filter.DestinationFilter;
   33   import org.apache.activemq.filter.LogicExpression;
   34   import org.apache.activemq.filter.MessageEvaluationContext;
   35   import org.apache.activemq.filter.NoLocalExpression;
   36   import org.apache.activemq.selector.SelectorParser;
   37   import org.apache.commons.logging.Log;
   38   import org.apache.commons.logging.LogFactory;
   39   
   40   public abstract class AbstractSubscription implements Subscription {
   41   
   42       private static final Log LOG = LogFactory.getLog(AbstractSubscription.class);
   43       protected Broker broker;
   44       protected ConnectionContext context;
   45       protected ConsumerInfo info;
   46       protected final DestinationFilter destinationFilter;
   47       protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
   48       private BooleanExpression selectorExpression;
   49       private ObjectName objectName;
   50       private int cursorMemoryHighWaterMark = 70;
   51   
   52   
   53       public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
   54           this.broker = broker;
   55           this.context = context;
   56           this.info = info;
   57           this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());
   58           this.selectorExpression = parseSelector(info);
   59       }
   60   
   61       private static BooleanExpression parseSelector(ConsumerInfo info) throws InvalidSelectorException {
   62           BooleanExpression rc = null;
   63           if (info.getSelector() != null) {
   64               rc = SelectorParser.parse(info.getSelector());
   65           }
   66           if (info.isNoLocal()) {
   67               if (rc == null) {
   68                   rc = new NoLocalExpression(info.getConsumerId().getConnectionId());
   69               } else {
   70                   rc = LogicExpression.createAND(new NoLocalExpression(info.getConsumerId().getConnectionId()), rc);
   71               }
   72           }
   73           if (info.getAdditionalPredicate() != null) {
   74               if (rc == null) {
   75                   rc = info.getAdditionalPredicate();
   76               } else {
   77                   rc = LogicExpression.createAND(info.getAdditionalPredicate(), rc);
   78               }
   79           }
   80           return rc;
   81       }
   82   
   83       public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
   84           ConsumerId targetConsumerId = node.getTargetConsumerId();
   85           if (targetConsumerId != null) {
   86               if (!targetConsumerId.equals(info.getConsumerId())) {
   87                   return false;
   88               }
   89           }
   90           try {
   91               return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);
   92           } catch (JMSException e) {
   93               LOG.info("Selector failed to evaluate: " + e.getMessage(), e);
   94               return false;
   95           }
   96       }
   97   
   98       public boolean matches(ActiveMQDestination destination) {
   99           return destinationFilter.matches(destination);
  100       }
  101   
  102       public void add(ConnectionContext context, Destination destination) throws Exception {
  103           destinations.add(destination);
  104       }
  105   
  106       public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
  107           destinations.remove(destination);
  108           return Collections.EMPTY_LIST;
  109       }
  110   
  111       public ConsumerInfo getConsumerInfo() {
  112           return info;
  113       }
  114   
  115       public void gc() {
  116       }
  117   
  118       public boolean isSlave() {
  119           return broker.getBrokerService().isSlave();
  120       }
  121   
  122       public ConnectionContext getContext() {
  123           return context;
  124       }
  125   
  126       public ConsumerInfo getInfo() {
  127           return info;
  128       }
  129   
  130       public BooleanExpression getSelectorExpression() {
  131           return selectorExpression;
  132       }
  133   
  134       public String getSelector() {
  135           return info.getSelector();
  136       }
  137   
  138       public void setSelector(String selector) throws InvalidSelectorException {
  139           ConsumerInfo copy = info.copy();
  140           copy.setSelector(selector);
  141           BooleanExpression newSelector = parseSelector(copy);
  142           // its valid so lets actually update it now
  143           info.setSelector(selector);
  144           this.selectorExpression = newSelector;
  145       }
  146   
  147       public ObjectName getObjectName() {
  148           return objectName;
  149       }
  150   
  151       public void setObjectName(ObjectName objectName) {
  152           this.objectName = objectName;
  153       }
  154   
  155       public int getPrefetchSize() {
  156           return info.getPrefetchSize();
  157       }
  158       public void setPrefetchSize(int newSize) {
  159           info.setPrefetchSize(newSize);
  160       }
  161   
  162       public boolean isRecoveryRequired() {
  163           return true;
  164       }
  165   
  166       public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception {
  167           boolean result = false;
  168           MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
  169           try {
  170               msgContext.setDestination(message.getRegionDestination().getActiveMQDestination());
  171               msgContext.setMessageReference(message);
  172               result = matches(message, msgContext);
  173               if (result) {
  174                   doAddRecoveredMessage(message);
  175               }
  176   
  177           } finally {
  178               msgContext.clear();
  179           }
  180           return result;
  181       }
  182   
  183       public ActiveMQDestination getActiveMQDestination() {
  184           return info != null ? info.getDestination() : null;
  185       }
  186       
  187       public boolean isBrowser() {
  188           return info != null && info.isBrowser();
  189       }
  190       
  191       public int getInFlightUsage() {
  192           if (info.getPrefetchSize() > 0) {
  193           return (getInFlightSize() * 100)/info.getPrefetchSize();
  194           }
  195           return Integer.MAX_VALUE;
  196       }
  197       
  198       /**
  199        * Add a destination
  200        * @param destination
  201        */
  202       public void addDestination(Destination destination) {
  203           
  204       }
  205          
  206       
  207       /**
  208        * Remove a destination
  209        * @param destination
  210        */
  211       public void removeDestination(Destination destination) {
  212           
  213       }
  214       
  215       public int getCursorMemoryHighWaterMark(){
  216       	return this.cursorMemoryHighWaterMark;
  217       }
  218   
  219   	public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark){
  220   		this.cursorMemoryHighWaterMark=cursorMemoryHighWaterMark;
  221   	}
  222       
  223       public int countBeforeFull() {
  224           return getDispatchedQueueSize() - info.getPrefetchSize();
  225       }
  226   
  227       protected void doAddRecoveredMessage(MessageReference message) throws Exception {
  228           add(message);
  229       }
  230   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]