Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region;
   18   
   19   import java.io.IOException;
   20   import java.util.Iterator;
   21   import java.util.concurrent.ConcurrentHashMap;
   22   
   23   import javax.jms.InvalidSelectorException;
   24   import javax.jms.JMSException;
   25   
   26   import org.apache.activemq.broker.Broker;
   27   import org.apache.activemq.broker.ConnectionContext;
   28   import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
   29   import org.apache.activemq.command.ActiveMQDestination;
   30   import org.apache.activemq.command.ConsumerInfo;
   31   import org.apache.activemq.command.Message;
   32   import org.apache.activemq.command.MessageAck;
   33   import org.apache.activemq.command.MessageDispatch;
   34   import org.apache.activemq.command.MessageId;
   35   import org.apache.activemq.store.TopicMessageStore;
   36   import org.apache.activemq.usage.SystemUsage;
   37   import org.apache.activemq.usage.Usage;
   38   import org.apache.activemq.usage.UsageListener;
   39   import org.apache.activemq.util.SubscriptionKey;
   40   import org.apache.commons.logging.Log;
   41   import org.apache.commons.logging.LogFactory;
   42   
   43   public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
   44   
   45       private static final Log LOG = LogFactory.getLog(DurableTopicSubscription.class);
   46       private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
   47       private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
   48       private final SubscriptionKey subscriptionKey;
   49       private final boolean keepDurableSubsActive;
   50       private boolean active;
   51   
   52       public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
   53           throws JMSException {
   54           super(broker,usageManager, context, info);
   55           this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
   56           this.pending.setSystemUsage(usageManager);
   57           this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
   58           this.keepDurableSubsActive = keepDurableSubsActive;
   59           subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
   60           
   61       }
   62   
   63       public boolean isActive() {
   64           return active;
   65       }
   66   
   67       public boolean isFull() {
   68           return !active || super.isFull();
   69       }
   70   
   71       public void gc() {
   72       }
   73   
   74       public void add(ConnectionContext context, Destination destination) throws Exception {
   75           super.add(context, destination);
   76           destinations.put(destination.getActiveMQDestination(), destination);
   77           if (destination.getMessageStore() != null) {
   78               TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
   79               try {
   80                   this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
   81               } catch (IOException e) {
   82                   JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
   83                   jmsEx.setLinkedException(e);
   84                   throw jmsEx;
   85               }
   86           }
   87           if (active || keepDurableSubsActive) {
   88               Topic topic = (Topic)destination;
   89               topic.activate(context, this);
   90               if (pending.isEmpty(topic)) {
   91                   topic.recoverRetroactiveMessages(context, this);
   92               }
   93           }
   94           dispatchPending();
   95       }
   96   
   97       public void activate(SystemUsage memoryManager, ConnectionContext context,
   98               ConsumerInfo info) throws Exception {
   99           LOG.debug("Activating " + this);
  100           if (!active) {
  101               this.active = true;
  102               this.context = context;
  103               this.info = info;
  104               int prefetch = info.getPrefetchSize();
  105               if (prefetch>0) {
  106               prefetch += prefetch/2;
  107               }
  108               int depth = Math.max(prefetch, this.pending.getMaxAuditDepth());
  109               this.pending.setMaxAuditDepth(depth);
  110               if (!keepDurableSubsActive) {
  111                   for (Iterator<Destination> iter = destinations.values()
  112                           .iterator(); iter.hasNext();) {
  113                       Topic topic = (Topic) iter.next();
  114                       topic.activate(context, this);
  115                   }
  116               }
  117               synchronized (pending) {
  118                   pending.setSystemUsage(memoryManager);
  119                   pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
  120                   pending.start();
  121   
  122                   // If nothing was in the persistent store, then try to use the
  123                   // recovery policy.
  124                   if (pending.isEmpty()) {
  125                       for (Iterator<Destination> iter = destinations.values()
  126                               .iterator(); iter.hasNext();) {
  127                           Topic topic = (Topic) iter.next();
  128                           topic.recoverRetroactiveMessages(context, this);
  129                       }
  130                   }
  131               }
  132               dispatchPending();
  133               this.usageManager.getMemoryUsage().addUsageListener(this);
  134           }
  135       }
  136   
  137       public void deactivate(boolean keepDurableSubsActive) throws Exception {
  138           active = false;
  139           this.usageManager.getMemoryUsage().removeUsageListener(this);
  140           synchronized (pending) {
  141               pending.stop();
  142           }
  143           if (!keepDurableSubsActive) {
  144               for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
  145                   Topic topic = (Topic)iter.next();
  146                   topic.deactivate(context, this);
  147               }
  148           }
  149           for (final MessageReference node : dispatched) {
  150               // Mark the dispatched messages as redelivered for next time.
  151               Integer count = redeliveredMessages.get(node.getMessageId());
  152               if (count != null) {
  153                   redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
  154               } else {
  155                   redeliveredMessages.put(node.getMessageId(), Integer.valueOf(1));
  156               }
  157               if (keepDurableSubsActive&& pending.isTransient()) {
  158                   synchronized (pending) {
  159                       pending.addMessageFirst(node);
  160                   }
  161               } else {
  162                   node.decrementReferenceCount();
  163               }
  164           }
  165           synchronized(dispatched) {
  166               dispatched.clear();
  167           }
  168           if (!keepDurableSubsActive && pending.isTransient()) {
  169               synchronized (pending) {
  170                   try {
  171                       pending.reset();
  172                       while (pending.hasNext()) {
  173                           MessageReference node = pending.next();
  174                           node.decrementReferenceCount();
  175                           pending.remove();
  176                       }
  177                   } finally {
  178                       pending.release();
  179                   }
  180               }
  181           }
  182           prefetchExtension = 0;
  183       }
  184       
  185       
  186       protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
  187           MessageDispatch md = super.createMessageDispatch(node, message);
  188           Integer count = redeliveredMessages.get(node.getMessageId());
  189           if (count != null) {
  190               md.setRedeliveryCounter(count.intValue());
  191           }
  192           return md;
  193       }
  194   
  195       public void add(MessageReference node) throws Exception {
  196           if (!active && !keepDurableSubsActive) {
  197               return;
  198           }
  199           super.add(node);
  200       }
  201   
  202       protected void doAddRecoveredMessage(MessageReference message) throws Exception {
  203           synchronized(pending) {
  204               pending.addRecoveredMessage(message);
  205           }
  206       }
  207   
  208       public int getPendingQueueSize() {
  209           if (active || keepDurableSubsActive) {
  210               return super.getPendingQueueSize();
  211           }
  212           // TODO: need to get from store
  213           return 0;
  214       }
  215   
  216       public void setSelector(String selector) throws InvalidSelectorException {
  217           throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
  218       }
  219   
  220       protected boolean canDispatch(MessageReference node) {
  221           return active;
  222       }
  223   
  224       protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
  225           node.getRegionDestination().acknowledge(context, this, ack, node);
  226           redeliveredMessages.remove(node.getMessageId());
  227           node.decrementReferenceCount();
  228       }
  229   
  230       
  231       public synchronized String toString() {
  232           return "DurableTopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
  233                  + getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
  234       }
  235   
  236       public SubscriptionKey getSubscriptionKey() {
  237           return subscriptionKey;
  238       }
  239   
  240       /**
  241        * Release any references that we are holding.
  242        */
  243       public void destroy() {
  244           synchronized (pending) {
  245               try {
  246   
  247                   pending.reset();
  248                   while (pending.hasNext()) {
  249                       MessageReference node = pending.next();
  250                       node.decrementReferenceCount();
  251                   }
  252   
  253               } finally {
  254                   pending.release();
  255                   pending.clear();
  256               }
  257           }
  258           synchronized(dispatched) {
  259               for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
  260                   MessageReference node = (MessageReference) iter.next();
  261                   node.decrementReferenceCount();
  262               }
  263               dispatched.clear();
  264           }
  265       }
  266   
  267       /**
  268        * @param usageManager
  269        * @param oldPercentUsage
  270        * @param newPercentUsage
  271        * @see org.apache.activemq.usage.UsageListener#onMemoryUseChanged(org.apache.activemq.usage.SystemUsage,
  272        *      int, int)
  273        */
  274       public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
  275           if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
  276               try {
  277                   dispatchPending();
  278               } catch (IOException e) {
  279                   LOG.warn("problem calling dispatchMatched", e);
  280               }
  281           }
  282       }
  283       
  284       protected boolean isDropped(MessageReference node) {
  285          return false;
  286        }
  287   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]