Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region;
   18   
   19   import java.io.IOException;
   20   import java.util.LinkedList;
   21   import java.util.concurrent.atomic.AtomicLong;
   22   
   23   import javax.jms.JMSException;
   24   
   25   import org.apache.activemq.broker.Broker;
   26   import org.apache.activemq.broker.ConnectionContext;
   27   import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
   28   import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
   29   import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
   30   import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
   31   import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
   32   import org.apache.activemq.command.ConsumerControl;
   33   import org.apache.activemq.command.ConsumerInfo;
   34   import org.apache.activemq.command.Message;
   35   import org.apache.activemq.command.MessageAck;
   36   import org.apache.activemq.command.MessageDispatch;
   37   import org.apache.activemq.command.MessageDispatchNotification;
   38   import org.apache.activemq.command.MessagePull;
   39   import org.apache.activemq.command.Response;
   40   import org.apache.activemq.transaction.Synchronization;
   41   import org.apache.activemq.usage.SystemUsage;
   42   import org.apache.commons.logging.Log;
   43   import org.apache.commons.logging.LogFactory;
   44   
   45   public class TopicSubscription extends AbstractSubscription {
   46   
   47       private static final Log LOG = LogFactory.getLog(TopicSubscription.class);
   48       private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
   49       
   50       protected PendingMessageCursor matched;
   51       protected final SystemUsage usageManager;
   52       protected AtomicLong dispatchedCounter = new AtomicLong();
   53          
   54       boolean singleDestination = true;
   55       Destination destination;
   56   
   57       private int maximumPendingMessages = -1;
   58       private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
   59       private int discarded;
   60       private final Object matchedListMutex = new Object();
   61       private final AtomicLong enqueueCounter = new AtomicLong(0);
   62       private final AtomicLong dequeueCounter = new AtomicLong(0);
   63       private int memoryUsageHighWaterMark = 95;
   64       private boolean slowConsumer;
   65   
   66       public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
   67           super(broker, context, info);
   68           this.usageManager = usageManager;
   69           String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
   70           if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
   71               this.matched = new VMPendingMessageCursor();
   72           } else {
   73               this.matched = new FilePendingMessageCursor(broker,matchedName);
   74           }
   75       }
   76   
   77       public void init() throws Exception {
   78           this.matched.setSystemUsage(usageManager);
   79           this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
   80           this.matched.start();
   81       }
   82   
   83       public void add(MessageReference node) throws Exception {
   84           enqueueCounter.incrementAndGet();
   85           if (!isFull() && matched.isEmpty()  && !isSlave()) {
   86               // if maximumPendingMessages is set we will only discard messages which
   87               // have not been dispatched (i.e. we allow the prefetch buffer to be filled)
   88               dispatch(node);
   89               slowConsumer=false;
   90           } else {
   91               //we are slow
   92               if(!slowConsumer) {
   93                   slowConsumer=true;
   94                   for (Destination dest: destinations) {
   95                       dest.slowConsumer(getContext(), this);
   96                   }
   97               }
   98               if (maximumPendingMessages != 0) {
   99               	synchronized(matchedListMutex){
  100               		while (matched.isFull()){
  101                           if (getContext().getStopping().get()) {
  102                               LOG.warn("stopped waiting for space in pendingMessage cursor for: " + node.getMessageId());
  103                               enqueueCounter.decrementAndGet();
  104                               return;
  105                           }
  106                           matchedListMutex.wait(20);
  107               		}
  108               		matched.addMessageLast(node);
  109               	}
  110                   synchronized (matchedListMutex) {
  111                       
  112                       // NOTE - be careful about the slaveBroker!
  113                       if (maximumPendingMessages > 0) {
  114                           // calculate the high water mark from which point we
  115                           // will eagerly evict expired messages
  116                           int max = messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
  117                           if (maximumPendingMessages > 0 && maximumPendingMessages < max) {
  118                               max = maximumPendingMessages;
  119                           }
  120                           if (!matched.isEmpty() && matched.size() > max) {
  121                               removeExpiredMessages();
  122                           }
  123                           // lets discard old messages as we are a slow consumer
  124                           while (!matched.isEmpty() && matched.size() > maximumPendingMessages) {
  125                               int pageInSize = matched.size() - maximumPendingMessages;
  126                               // only page in a 1000 at a time - else we could
  127                               // blow da memory
  128                               pageInSize = Math.max(1000, pageInSize);
  129                               LinkedList<MessageReference> list = null;
  130                               MessageReference[] oldMessages=null;
  131                               synchronized(matched){
  132                                   list = matched.pageInList(pageInSize);
  133                               	oldMessages = messageEvictionStrategy.evictMessages(list);
  134                               	for (MessageReference ref : list) {
  135                               	    ref.decrementReferenceCount();
  136                               	}
  137                               }
  138                               int messagesToEvict = 0;
  139                               if (oldMessages != null){
  140   	                            messagesToEvict = oldMessages.length;
  141   	                            for (int i = 0; i < messagesToEvict; i++) {
  142   	                                MessageReference oldMessage = oldMessages[i];
  143   	                                discard(oldMessage);
  144   	                            }
  145                               }
  146                               // lets avoid an infinite loop if we are given a bad
  147                               // eviction strategy
  148                               // for a bad strategy lets just not evict
  149                               if (messagesToEvict == 0) {
  150                                   LOG.warn("No messages to evict returned from eviction strategy: " + messageEvictionStrategy);
  151                                   break;
  152                               }
  153                           }
  154                       }
  155                   }
  156                   dispatchMatched();
  157               }
  158           }
  159       }
  160   
  161       /**
  162        * Discard any expired messages from the matched list. Called from a
  163        * synchronized block.
  164        * 
  165        * @throws IOException
  166        */
  167       protected void removeExpiredMessages() throws IOException {
  168           try {
  169               matched.reset();
  170               while (matched.hasNext()) {
  171                   MessageReference node = matched.next();
  172                   node.decrementReferenceCount();
  173                   if (broker.isExpired(node)) {
  174                       matched.remove();
  175                       dispatchedCounter.incrementAndGet();
  176                       node.decrementReferenceCount();
  177                       node.getRegionDestination().getDestinationStatistics().getExpired().increment();
  178                       broker.messageExpired(getContext(), node);
  179                       break;
  180                   }
  181               }
  182           } finally {
  183               matched.release();
  184           }
  185       }
  186   
  187       public void processMessageDispatchNotification(MessageDispatchNotification mdn) {
  188           synchronized (matchedListMutex) {
  189               try {
  190                   matched.reset();
  191                   while (matched.hasNext()) {
  192                       MessageReference node = matched.next();
  193                       node.decrementReferenceCount();
  194                       if (node.getMessageId().equals(mdn.getMessageId())) {
  195                           matched.remove();
  196                           dispatchedCounter.incrementAndGet();
  197                           node.decrementReferenceCount();
  198                           break;
  199                       }
  200                   }
  201               } finally {
  202                   matched.release();
  203               }
  204           }
  205       }
  206   
  207       public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
  208           // Handle the standard acknowledgment case.
  209           if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
  210               if (context.isInTransaction()) {
  211                   context.getTransaction().addSynchronization(new Synchronization() {
  212   
  213                       public void afterCommit() throws Exception {
  214                          synchronized (TopicSubscription.this) {
  215                               if (singleDestination && destination != null) {
  216                                   destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
  217                               }
  218                           }
  219                           dequeueCounter.addAndGet(ack.getMessageCount());
  220                           dispatchMatched();
  221                       }
  222                   });
  223               } else {
  224                   if (singleDestination && destination != null) {
  225                       destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
  226                       destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
  227                   }
  228                   dequeueCounter.addAndGet(ack.getMessageCount());
  229               }
  230               dispatchMatched();
  231               return;
  232           } else if (ack.isDeliveredAck()) {
  233               // Message was delivered but not acknowledged: update pre-fetch
  234               // counters.
  235               // also. get these for a consumer expired message.
  236               if (destination != null && !ack.isInTransaction()) {
  237                   destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
  238                   destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());   
  239               }
  240               dequeueCounter.addAndGet(ack.getMessageCount());
  241               dispatchMatched();
  242               return;
  243           }
  244           throw new JMSException("Invalid acknowledgment: " + ack);
  245       }
  246   
  247       public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
  248           // not supported for topics
  249           return null;
  250       }
  251   
  252       public int getPendingQueueSize() {
  253           return matched();
  254       }
  255   
  256       public int getDispatchedQueueSize() {
  257           return (int)(dispatchedCounter.get() - dequeueCounter.get());
  258       }
  259   
  260       public int getMaximumPendingMessages() {
  261           return maximumPendingMessages;
  262       }
  263   
  264       public long getDispatchedCounter() {
  265           return dispatchedCounter.get();
  266       }
  267   
  268       public long getEnqueueCounter() {
  269           return enqueueCounter.get();
  270       }
  271   
  272       public long getDequeueCounter() {
  273           return dequeueCounter.get();
  274       }
  275   
  276       /**
  277        * @return the number of messages discarded due to being a slow consumer
  278        */
  279       public int discarded() {
  280           synchronized (matchedListMutex) {
  281               return discarded;
  282           }
  283       }
  284   
  285       /**
  286        * @return the number of matched messages (messages targeted for the
  287        *         subscription but not yet able to be dispatched due to the
  288        *         prefetch buffer being full).
  289        */
  290       public int matched() {
  291           synchronized (matchedListMutex) {
  292               return matched.size();
  293           }
  294       }
  295   
  296       /**
  297        * Sets the maximum number of pending messages that can be matched against
  298        * this consumer before old messages are discarded.
  299        */
  300       public void setMaximumPendingMessages(int maximumPendingMessages) {
  301           this.maximumPendingMessages = maximumPendingMessages;
  302       }
  303   
  304       public MessageEvictionStrategy getMessageEvictionStrategy() {
  305           return messageEvictionStrategy;
  306       }
  307   
  308       /**
  309        * Sets the eviction strategy used to decide which message to evict when the
  310        * slow consumer needs to discard messages
  311        */
  312       public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
  313           this.messageEvictionStrategy = messageEvictionStrategy;
  314       }
  315   
  316       // Implementation methods
  317       // -------------------------------------------------------------------------
  318       public boolean isFull() {
  319           return getDispatchedQueueSize()  >= info.getPrefetchSize();
  320       }
  321       
  322       public int getInFlightSize() {
  323           return getDispatchedQueueSize();
  324       }
  325       
  326       
  327       /**
  328        * @return true when 60% or more room is left for dispatching messages
  329        */
  330       public boolean isLowWaterMark() {
  331           return getDispatchedQueueSize() <= (info.getPrefetchSize() * .4);
  332       }
  333   
  334       /**
  335        * @return true when 10% or less room is left for dispatching messages
  336        */
  337       public boolean isHighWaterMark() {
  338           return getDispatchedQueueSize() >= (info.getPrefetchSize() * .9);
  339       }
  340   
  341       /**
  342        * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
  343        */
  344       public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
  345           this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
  346       }
  347   
  348       /**
  349        * @return the memoryUsageHighWaterMark
  350        */
  351       public int getMemoryUsageHighWaterMark() {
  352           return this.memoryUsageHighWaterMark;
  353       }
  354   
  355       /**
  356        * @return the usageManager
  357        */
  358       public SystemUsage getUsageManager() {
  359           return this.usageManager;
  360       }
  361   
  362       /**
  363        * @return the matched
  364        */
  365       public PendingMessageCursor getMatched() {
  366           return this.matched;
  367       }
  368   
  369       /**
  370        * @param matched the matched to set
  371        */
  372       public void setMatched(PendingMessageCursor matched) {
  373           this.matched = matched;
  374       }
  375   
  376       /**
  377        * inform the MessageConsumer on the client to change it's prefetch
  378        * 
  379        * @param newPrefetch
  380        */
  381       public void updateConsumerPrefetch(int newPrefetch) {
  382           if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
  383               ConsumerControl cc = new ConsumerControl();
  384               cc.setConsumerId(info.getConsumerId());
  385               cc.setPrefetch(newPrefetch);
  386               context.getConnection().dispatchAsync(cc);
  387           }
  388       }
  389   
  390       private void dispatchMatched() throws IOException {       
  391           synchronized (matchedListMutex) {
  392               if (!matched.isEmpty() && !isFull()) {
  393                   try {
  394                       matched.reset();
  395                      
  396                       while (matched.hasNext() && !isFull()) {
  397                           MessageReference message = (MessageReference) matched.next();
  398                           message.decrementReferenceCount();
  399                           matched.remove();
  400                           // Message may have been sitting in the matched list a
  401                           // while
  402                           // waiting for the consumer to ak the message.
  403                           if (message.isExpired()) {
  404                               discard(message);
  405                               continue; // just drop it.
  406                           }
  407                           dispatch(message);
  408                       }
  409                   } finally {
  410                       matched.release();
  411                   }
  412               }
  413           }
  414       }
  415   
  416       private void dispatch(final MessageReference node) throws IOException {
  417           Message message = (Message)node;
  418           node.incrementReferenceCount();
  419           // Make sure we can dispatch a message.
  420           MessageDispatch md = new MessageDispatch();
  421           md.setMessage(message);
  422           md.setConsumerId(info.getConsumerId());
  423           md.setDestination(node.getRegionDestination().getActiveMQDestination());
  424           dispatchedCounter.incrementAndGet();
  425           // Keep track if this subscription is receiving messages from a single
  426           // destination.
  427           if (singleDestination) {
  428               if (destination == null) {
  429                   destination = node.getRegionDestination();
  430               } else {
  431                   if (destination != node.getRegionDestination()) {
  432                       singleDestination = false;
  433                   }
  434               }
  435           }
  436           if (info.isDispatchAsync()) {
  437               md.setTransmitCallback(new Runnable() {
  438   
  439                   public void run() {
  440                       node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
  441                       node.getRegionDestination().getDestinationStatistics().getInflight().increment();
  442                       node.decrementReferenceCount();
  443                   }
  444               });
  445               context.getConnection().dispatchAsync(md);
  446           } else {
  447               context.getConnection().dispatchSync(md);
  448               node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
  449               node.getRegionDestination().getDestinationStatistics().getInflight().increment();
  450               node.decrementReferenceCount();
  451           }
  452       }
  453   
  454       private void discard(MessageReference message) {
  455           message.decrementReferenceCount();
  456           matched.remove(message);
  457           discarded++;
  458           if(destination != null) {
  459               destination.getDestinationStatistics().getDequeues().increment();
  460           }
  461           if (LOG.isDebugEnabled()) {
  462               LOG.debug("Discarding message " + message);
  463           }
  464           Destination dest = message.getRegionDestination();
  465           if (dest != null) {
  466               dest.messageDiscarded(getContext(), message);
  467           }
  468           broker.getRoot().sendToDeadLetterQueue(getContext(), message);
  469       }
  470   
  471       public String toString() {
  472           return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
  473                  + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
  474       }
  475   
  476       public void destroy() {
  477           synchronized (matchedListMutex) {
  478               try {
  479                   matched.destroy();
  480               } catch (Exception e) {
  481                   LOG.warn("Failed to destroy cursor", e);
  482               }
  483           }
  484       }
  485   
  486       public int getPrefetchSize() {
  487           return (int)info.getPrefetchSize();
  488       }
  489   
  490   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]