Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq;
   18   
   19   import java.io.Serializable;
   20   import org.apache.commons.logging.Log;
   21   import org.apache.commons.logging.LogFactory;
   22   
   23   /**
   24    * Defines the prefetch message policies for different types of consumers
   25    * 
   26    * @org.apache.xbean.XBean element="prefetchPolicy"
   27    * @version $Revision: 1.3 $
   28    */
   29   public class ActiveMQPrefetchPolicy extends Object implements Serializable {
   30       public static final int MAX_PREFETCH_SIZE = Short.MAX_VALUE - 1;
   31       public static final int DEFAULT_QUEUE_PREFETCH = 1000;
   32       public static final int DEFAULT_QUEUE_BROWSER_PREFETCH = 500;
   33       public static final int DEFAULT_DURABLE_TOPIC_PREFETCH = 100;
   34       public static final int DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH=1000;
   35       public static final int DEFAULT_INPUT_STREAM_PREFETCH=100;
   36       public static final int DEFAULT_TOPIC_PREFETCH = MAX_PREFETCH_SIZE;
   37       
   38       private static final Log LOG = LogFactory.getLog(ActiveMQPrefetchPolicy.class);
   39       
   40       private int queuePrefetch;
   41       private int queueBrowserPrefetch;
   42       private int topicPrefetch;
   43       private int durableTopicPrefetch;
   44       private int optimizeDurableTopicPrefetch;
   45       private int inputStreamPrefetch;
   46       private int maximumPendingMessageLimit;
   47   
   48       /**
   49        * Initialize default prefetch policies
   50        */
   51       public ActiveMQPrefetchPolicy() {
   52           this.queuePrefetch = DEFAULT_QUEUE_PREFETCH;
   53           this.queueBrowserPrefetch = DEFAULT_QUEUE_BROWSER_PREFETCH;
   54           this.topicPrefetch = DEFAULT_TOPIC_PREFETCH;
   55           this.durableTopicPrefetch = DEFAULT_DURABLE_TOPIC_PREFETCH;
   56           this.optimizeDurableTopicPrefetch = DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH;
   57           this.inputStreamPrefetch = DEFAULT_INPUT_STREAM_PREFETCH;
   58       }
   59   
   60       /**
   61        * @return Returns the durableTopicPrefetch.
   62        */
   63       public int getDurableTopicPrefetch() {
   64           return durableTopicPrefetch;
   65       }
   66   
   67       /**
   68        * @param durableTopicPrefetch The durableTopicPrefetch to set.
   69        */
   70       public void setDurableTopicPrefetch(int durableTopicPrefetch) {
   71           this.durableTopicPrefetch = getMaxPrefetchLimit(durableTopicPrefetch);
   72       }
   73   
   74       /**
   75        * @return Returns the queuePrefetch.
   76        */
   77       public int getQueuePrefetch() {
   78           return queuePrefetch;
   79       }
   80   
   81       /**
   82        * @param queuePrefetch The queuePrefetch to set.
   83        */
   84       public void setQueuePrefetch(int queuePrefetch) {
   85           this.queuePrefetch = getMaxPrefetchLimit(queuePrefetch);
   86       }
   87   
   88       /**
   89        * @return Returns the queueBrowserPrefetch.
   90        */
   91       public int getQueueBrowserPrefetch() {
   92           return queueBrowserPrefetch;
   93       }
   94   
   95       /**
   96        * @param queueBrowserPrefetch The queueBrowserPrefetch to set.
   97        */
   98       public void setQueueBrowserPrefetch(int queueBrowserPrefetch) {
   99           this.queueBrowserPrefetch = getMaxPrefetchLimit(queueBrowserPrefetch);
  100       }
  101   
  102       /**
  103        * @return Returns the topicPrefetch.
  104        */
  105       public int getTopicPrefetch() {
  106           return topicPrefetch;
  107       }
  108   
  109       /**
  110        * @param topicPrefetch The topicPrefetch to set.
  111        */
  112       public void setTopicPrefetch(int topicPrefetch) {
  113           this.topicPrefetch = getMaxPrefetchLimit(topicPrefetch);
  114       }
  115   
  116       /**
  117        * @return Returns the optimizeDurableTopicPrefetch.
  118        */
  119       public int getOptimizeDurableTopicPrefetch() {
  120           return optimizeDurableTopicPrefetch;
  121       }
  122   
  123       /**
  124        * @param optimizeAcknowledgePrefetch The optimizeDurableTopicPrefetch to
  125        *                set.
  126        */
  127       public void setOptimizeDurableTopicPrefetch(int optimizeAcknowledgePrefetch) {
  128           this.optimizeDurableTopicPrefetch = optimizeAcknowledgePrefetch;
  129       }
  130   
  131       public int getMaximumPendingMessageLimit() {
  132           return maximumPendingMessageLimit;
  133       }
  134   
  135       /**
  136        * Sets how many messages a broker will keep around, above the prefetch
  137        * limit, for non-durable topics before starting to discard older messages.
  138        */
  139       public void setMaximumPendingMessageLimit(int maximumPendingMessageLimit) {
  140           this.maximumPendingMessageLimit = maximumPendingMessageLimit;
  141       }
  142   
  143       private int getMaxPrefetchLimit(int value) {
  144           int result = Math.min(value, MAX_PREFETCH_SIZE);
  145           if (result < value) {
  146               LOG.warn("maximum prefetch limit has been reset from " + value + " to " + MAX_PREFETCH_SIZE);
  147           }
  148           return result;
  149       }
  150   
  151       public void setAll(int i) {
  152           this.durableTopicPrefetch = i;
  153           this.queueBrowserPrefetch = i;
  154           this.queuePrefetch = i;
  155           this.topicPrefetch = i;
  156           this.inputStreamPrefetch = 1;
  157           this.optimizeDurableTopicPrefetch = i;
  158       }
  159   
  160       public int getInputStreamPrefetch() {
  161           return inputStreamPrefetch;
  162       }
  163   
  164       public void setInputStreamPrefetch(int inputStreamPrefetch) {
  165           this.inputStreamPrefetch = getMaxPrefetchLimit(inputStreamPrefetch);
  166       }
  167       
  168       public boolean equals(Object object){
  169           if (object instanceof ActiveMQPrefetchPolicy){
  170               ActiveMQPrefetchPolicy other = (ActiveMQPrefetchPolicy) object;
  171               return this.queuePrefetch == other.queuePrefetch &&
  172               this.queueBrowserPrefetch == other.queueBrowserPrefetch &&
  173               this.topicPrefetch == other.topicPrefetch &&
  174               this.durableTopicPrefetch == other.durableTopicPrefetch &&
  175               this.optimizeDurableTopicPrefetch == other.optimizeDurableTopicPrefetch &&
  176               this.inputStreamPrefetch == other.inputStreamPrefetch;
  177           }
  178           return false;
  179       }
  180   
  181   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]