Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region;
   18   
   19   import java.io.IOException;
   20   import java.util.AbstractList;
   21   import java.util.ArrayList;
   22   import java.util.Collection;
   23   import java.util.Collections;
   24   import java.util.Comparator;
   25   import java.util.HashSet;
   26   import java.util.Iterator;
   27   import java.util.LinkedHashMap;
   28   import java.util.LinkedList;
   29   import java.util.List;
   30   import java.util.Map;
   31   import java.util.Set;
   32   import java.util.concurrent.CopyOnWriteArraySet;
   33   import java.util.concurrent.CountDownLatch;
   34   import java.util.concurrent.DelayQueue;
   35   import java.util.concurrent.Delayed;
   36   import java.util.concurrent.ExecutorService;
   37   import java.util.concurrent.TimeUnit;
   38   import java.util.concurrent.atomic.AtomicLong;
   39   
   40   import javax.jms.InvalidSelectorException;
   41   import javax.jms.JMSException;
   42   import javax.jms.ResourceAllocationException;
   43   
   44   import org.apache.activemq.broker.BrokerService;
   45   import org.apache.activemq.broker.ConnectionContext;
   46   import org.apache.activemq.broker.ProducerBrokerExchange;
   47   import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
   48   import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
   49   import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
   50   import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
   51   import org.apache.activemq.broker.region.group.MessageGroupMap;
   52   import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
   53   import org.apache.activemq.broker.region.policy.DispatchPolicy;
   54   import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
   55   import org.apache.activemq.command.ActiveMQDestination;
   56   import org.apache.activemq.command.ConsumerId;
   57   import org.apache.activemq.command.ExceptionResponse;
   58   import org.apache.activemq.command.Message;
   59   import org.apache.activemq.command.MessageAck;
   60   import org.apache.activemq.command.MessageDispatchNotification;
   61   import org.apache.activemq.command.MessageId;
   62   import org.apache.activemq.command.ProducerAck;
   63   import org.apache.activemq.command.ProducerInfo;
   64   import org.apache.activemq.command.Response;
   65   import org.apache.activemq.filter.BooleanExpression;
   66   import org.apache.activemq.filter.MessageEvaluationContext;
   67   import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
   68   import org.apache.activemq.security.SecurityContext;
   69   import org.apache.activemq.selector.SelectorParser;
   70   import org.apache.activemq.store.MessageRecoveryListener;
   71   import org.apache.activemq.store.MessageStore;
   72   import org.apache.activemq.thread.Scheduler;
   73   import org.apache.activemq.thread.Task;
   74   import org.apache.activemq.thread.TaskRunner;
   75   import org.apache.activemq.thread.TaskRunnerFactory;
   76   import org.apache.activemq.transaction.Synchronization;
   77   import org.apache.activemq.usage.Usage;
   78   import org.apache.activemq.usage.UsageListener;
   79   import org.apache.activemq.util.BrokerSupport;
   80   import org.apache.commons.logging.Log;
   81   import org.apache.commons.logging.LogFactory;
   82   
   83   /**
   84    * The Queue is a List of MessageEntry objects that are dispatched to matching
   85    * subscriptions.
   86    * 
   87    * @version $Revision: 1.28 $
   88    */
   89   public class Queue extends BaseDestination implements Task, UsageListener {
   90       protected static final Log LOG = LogFactory.getLog(Queue.class);
   91       protected final TaskRunnerFactory taskFactory;
   92       protected TaskRunner taskRunner;
   93       protected final List<Subscription> consumers = new ArrayList<Subscription>(50);
   94       protected PendingMessageCursor messages;
   95       private final LinkedHashMap<MessageId, QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId, QueueMessageReference>();
   96       // Messages that are paged in but have not yet been targeted at a subscription
   97       private List<QueueMessageReference> pagedInPendingDispatch = new ArrayList<QueueMessageReference>(100);
   98       private MessageGroupMap messageGroupOwners;
   99       private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
  100       private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
  101       private final Object sendLock = new Object();
  102       private ExecutorService executor;
  103       protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections.synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
  104       private final Object dispatchMutex = new Object();
  105       private boolean useConsumerPriority = true;
  106       private boolean strictOrderDispatch = false;
  107       private QueueDispatchSelector dispatchSelector;
  108       private boolean optimizedDispatch = false;
  109       private boolean firstConsumer = false;
  110       private int timeBeforeDispatchStarts = 0;
  111       private int consumersBeforeDispatchStarts = 0;
  112       private CountDownLatch consumersBeforeStartsLatch;
  113       private AtomicLong pendingWakeups = new AtomicLong();
  114   
  115       private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
  116           public void run() {
  117               asyncWakeup();
  118           }
  119       };
  120       private final Runnable expireMessagesTask = new Runnable() {
  121           public void run() {
  122               expireMessages();
  123           }
  124       };
  125       
  126       private final Object iteratingMutex = new Object() {};
  127       private static final Scheduler scheduler = Scheduler.getInstance();
  128       
  129       class TimeoutMessage implements Delayed {
  130   
  131           Message message;
  132           ConnectionContext context;
  133           long trigger;
  134           
  135           public TimeoutMessage(Message message, ConnectionContext context, long delay) {
  136               this.message = message;
  137               this.context = context;
  138               this.trigger = System.currentTimeMillis() + delay;
  139           }
  140           
  141           public long getDelay(TimeUnit unit) {
  142               long n = trigger - System.currentTimeMillis();
  143               return unit.convert(n, TimeUnit.MILLISECONDS);
  144           }
  145   
  146           public int compareTo(Delayed delayed) {
  147               long other = ((TimeoutMessage)delayed).trigger;
  148               int returnValue;
  149               if (this.trigger < other) {
  150                 returnValue = -1;
  151               } else if (this.trigger > other) {
  152                 returnValue = 1;
  153               } else {
  154                 returnValue = 0;
  155               }
  156               return returnValue;
  157           }
  158           
  159       }
  160       
  161       DelayQueue<TimeoutMessage> flowControlTimeoutMessages = new DelayQueue<TimeoutMessage>();
  162       
  163       class FlowControlTimeoutTask extends Thread {
  164           
  165           public void run() {
  166               TimeoutMessage timeout;
  167               try {
  168                   while (true) {
  169                       timeout = flowControlTimeoutMessages.take();
  170                       if (timeout != null) {
  171                           synchronized (messagesWaitingForSpace) {
  172                               if (messagesWaitingForSpace.remove(timeout.message.getMessageId()) != null) {
  173                                   ExceptionResponse response = new ExceptionResponse(new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + timeout.message.getProducerId() + ") to prevent flooding "
  174                                           + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info"));
  175                                   response.setCorrelationId(timeout.message.getCommandId());
  176                                   timeout.context.getConnection().dispatchAsync(response);
  177                               }
  178                           }
  179                       }
  180                   }
  181               } catch (InterruptedException e) {
  182                   if (LOG.isDebugEnabled()) {
  183                       LOG.debug("Producer Flow Control Timeout Task is stopping");
  184                   }
  185               }
  186           }
  187       };
  188       
  189       private final FlowControlTimeoutTask flowControlTimeoutTask = new FlowControlTimeoutTask();
  190       
  191   
  192       private static final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
  193   
  194           public int compare(Subscription s1, Subscription s2) {
  195               //We want the list sorted in descending order
  196               return s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
  197           }
  198       };
  199   
  200       public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
  201           super(brokerService, store, destination, parentStats);
  202           this.taskFactory = taskFactory;
  203           this.dispatchSelector = new QueueDispatchSelector(destination);
  204       }
  205   
  206       public List<Subscription> getConsumers() {
  207           synchronized (consumers) {
  208               return new ArrayList<Subscription>(consumers);
  209           }
  210       }
  211   
  212       // make the queue easily visible in the debugger from its task runner threads
  213       final class QueueThread extends Thread {
  214           final Queue queue;
  215   
  216           public QueueThread(Runnable runnable, String name, Queue queue) {
  217               super(runnable, name);
  218               this.queue = queue;
  219           }
  220       }
  221   
  222       public void initialize() throws Exception {
  223           if (this.messages == null) {
  224               if (destination.isTemporary() || broker == null || store == null) {
  225                   this.messages = new VMPendingMessageCursor();
  226               } else {
  227                   this.messages = new StoreQueueCursor(broker, this);
  228               }
  229           }
  230           // If a VMPendingMessageCursor don't use the default Producer System Usage
  231           // since it turns into a shared blocking queue which can lead to a network deadlock.  
  232           // If we are cursoring to disk..it's not and issue because it does not block due 
  233           // to large disk sizes.
  234           if (messages instanceof VMPendingMessageCursor) {
  235               this.systemUsage = brokerService.getSystemUsage();
  236               memoryUsage.setParent(systemUsage.getMemoryUsage());
  237           }
  238   
  239           this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
  240   
  241           super.initialize();
  242           if (store != null) {
  243               // Restore the persistent messages.
  244               messages.setSystemUsage(systemUsage);
  245               messages.setEnableAudit(isEnableAudit());
  246               messages.setMaxAuditDepth(getMaxAuditDepth());
  247               messages.setMaxProducersToAudit(getMaxProducersToAudit());
  248               messages.setUseCache(isUseCache());
  249               messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
  250               if (messages.isRecoveryRequired()) {
  251                   store.recover(new MessageRecoveryListener() {
  252   
  253                       public boolean recoverMessage(Message message) {
  254                           // Message could have expired while it was being
  255                           // loaded..
  256                           if (message.isExpired()) {
  257                               if (broker.isExpired(message)) {
  258                                   messageExpired(createConnectionContext(), createMessageReference(message));
  259                                   // drop message will decrement so counter balance here
  260                                   destinationStatistics.getMessages().increment();
  261                               }
  262                               return true;
  263                           }
  264                           if (hasSpace()) {
  265                               message.setRegionDestination(Queue.this);
  266                               synchronized (messages) {
  267                                   try {
  268                                       messages.addMessageLast(message);
  269                                   } catch (Exception e) {
  270                                       LOG.fatal("Failed to add message to cursor", e);
  271                                   }
  272                               }
  273                               destinationStatistics.getMessages().increment();
  274                               return true;
  275                           }
  276                           return false;
  277                       }
  278   
  279                       public boolean recoverMessageReference(MessageId messageReference) throws Exception {
  280                           throw new RuntimeException("Should not be called.");
  281                       }
  282   
  283                       public boolean hasSpace() {
  284                           return true;
  285                       }
  286   
  287                       public boolean isDuplicate(MessageId id) {
  288                           return false;
  289                       }
  290                   });
  291               } else {
  292                   int messageCount = store.getMessageCount();
  293                   destinationStatistics.getMessages().setCount(messageCount);
  294               }
  295           }
  296       }
  297   
  298       /*
  299        * Holder for subscription that needs attention on next iterate
  300        * browser needs access to existing messages in the queue that have already been dispatched
  301        */
  302       class BrowserDispatch {
  303           QueueBrowserSubscription browser;
  304   
  305           public BrowserDispatch(QueueBrowserSubscription browserSubscription) {
  306               browser = browserSubscription;
  307               browser.incrementQueueRef();
  308           }
  309   
  310           void done() {
  311               try {
  312                   browser.decrementQueueRef();
  313               } catch (Exception e) {
  314                   LOG.warn("decrement ref on browser: " + browser, e);
  315               }
  316           }
  317   
  318           public QueueBrowserSubscription getBrowser() {
  319               return browser;
  320           }
  321       }
  322   
  323       LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
  324   
  325       public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
  326           // synchronize with dispatch method so that no new messages are sent
  327           // while setting up a subscription. avoid out of order messages,
  328           // duplicates, etc.
  329           synchronized (dispatchMutex) {
  330   
  331               sub.add(context, this);
  332               destinationStatistics.getConsumers().increment();
  333   
  334               // needs to be synchronized - so no contention with dispatching
  335               synchronized (consumers) {
  336   
  337                   // set a flag if this is a first consumer
  338                   if (consumers.size() == 0) {
  339                       firstConsumer = true;
  340                       if (consumersBeforeDispatchStarts != 0) {
  341                           consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
  342                       }
  343                   } else {
  344                       if (consumersBeforeStartsLatch != null) {
  345                           consumersBeforeStartsLatch.countDown();
  346                       }
  347                   }
  348   
  349                   addToConsumerList(sub);
  350                   if (sub.getConsumerInfo().isExclusive()) {
  351                       Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
  352                       if (exclusiveConsumer == null) {
  353                           exclusiveConsumer = sub;
  354                       } else if (sub.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority()) {
  355                           exclusiveConsumer = sub;
  356                       }
  357                       dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
  358                   }
  359               }
  360   
  361               if (sub instanceof QueueBrowserSubscription) {
  362                   // tee up for dispatch in next iterate
  363                   QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
  364                   synchronized (pagedInMessages) {
  365                       BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription);
  366                       browserDispatches.addLast(browserDispatch);
  367                   }
  368               }
  369               
  370               if (!(this.optimizedDispatch || isSlave())) {
  371                   wakeup();
  372               }
  373           }
  374           if (this.optimizedDispatch || isSlave()) {
  375               // Outside of dispatchLock() to maintain the lock hierarchy of
  376               // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
  377               wakeup();
  378           }
  379       }
  380   
  381       public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId) throws Exception {
  382           destinationStatistics.getConsumers().decrement();
  383           // synchronize with dispatch method so that no new messages are sent
  384           // while removing up a subscription.
  385           synchronized (dispatchMutex) {
  386               if (LOG.isDebugEnabled()) {
  387                   LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " + lastDeiveredSequenceId + ", dequeues: " + getDestinationStatistics().getDequeues().getCount() + ", dispatched: "
  388                           + getDestinationStatistics().getDispatched().getCount() + ", inflight: " + getDestinationStatistics().getInflight().getCount());
  389               }
  390               synchronized (consumers) {
  391                   removeFromConsumerList(sub);
  392                   if (sub.getConsumerInfo().isExclusive()) {
  393                       Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
  394                       if (exclusiveConsumer == sub) {
  395                           exclusiveConsumer = null;
  396                           for (Subscription s : consumers) {
  397                               if (s.getConsumerInfo().isExclusive() && (exclusiveConsumer == null || s.getConsumerInfo().getPriority() > exclusiveConsumer.getConsumerInfo().getPriority())) {
  398                                   exclusiveConsumer = s;
  399   
  400                               }
  401                           }
  402                           dispatchSelector.setExclusiveConsumer(exclusiveConsumer);
  403                       }
  404                   }
  405                   ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
  406                   getMessageGroupOwners().removeConsumer(consumerId);
  407   
  408                   // redeliver inflight messages
  409                   List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
  410                   for (MessageReference ref : sub.remove(context, this)) {
  411                       QueueMessageReference qmr = (QueueMessageReference) ref;
  412                       if (qmr.getLockOwner() == sub) {
  413                           qmr.unlock();
  414                           // only increment redelivery if it was delivered or we have no delivery information
  415                           if (lastDeiveredSequenceId == 0 || qmr.getMessageId().getBrokerSequenceId() <= lastDeiveredSequenceId) {
  416                               qmr.incrementRedeliveryCounter();
  417                           }
  418                       }
  419                       list.add(qmr);
  420                   }
  421   
  422                   if (!list.isEmpty()) {
  423                       doDispatch(list);
  424                   }
  425               }
  426               if (!(this.optimizedDispatch || isSlave())) {
  427                   wakeup();
  428               }
  429           }
  430           if (this.optimizedDispatch || isSlave()) {
  431               // Outside of dispatchLock() to maintain the lock hierarchy of
  432               // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
  433               wakeup();
  434           }
  435       }
  436   
  437       public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
  438           final ConnectionContext context = producerExchange.getConnectionContext();
  439           // There is delay between the client sending it and it arriving at the
  440           // destination.. it may have expired.
  441           message.setRegionDestination(this);
  442           final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
  443           final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
  444           if (message.isExpired()) {
  445               //message not stored - or added to stats yet - so chuck here
  446               broker.getRoot().messageExpired(context, message);
  447               if (sendProducerAck) {
  448                   ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
  449                   context.getConnection().dispatchAsync(ack);
  450               }
  451               return;
  452           }
  453           if (memoryUsage.isFull()) {
  454               isFull(context, memoryUsage);
  455               fastProducer(context, producerInfo);
  456               if (isProducerFlowControl() && context.isProducerFlowControl()) {
  457                   if (warnOnProducerFlowControl) {
  458                       warnOnProducerFlowControl = false;
  459                       LOG.info("Usage Manager Memory Limit reached on " + getActiveMQDestination().getQualifiedName()
  460                               + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
  461                               + " See http://activemq.apache.org/producer-flow-control.html for more info");
  462                   }
  463   
  464                   if (systemUsage.isSendFailIfNoSpace()) {
  465                       throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
  466                               + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
  467                   }
  468   
  469                   // We can avoid blocking due to low usage if the producer is sending
  470                   // a sync message or if it is using a producer window
  471                   if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
  472                       // copy the exchange state since the context will be modified while we are waiting
  473                       // for space.
  474                       final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy();
  475                       synchronized (messagesWaitingForSpace) {
  476                           messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
  477                               public void run() {
  478   
  479                                   try {
  480                                       // While waiting for space to free up... the
  481                                       // message may have expired.
  482                                       if (message.isExpired()) {
  483                                           LOG.error("expired waiting for space..");
  484                                           broker.messageExpired(context, message);
  485                                           destinationStatistics.getExpired().increment();
  486                                       } else {
  487                                           doMessageSend(producerExchangeCopy, message);
  488                                       }
  489   
  490                                       if (sendProducerAck) {
  491                                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
  492                                           context.getConnection().dispatchAsync(ack);
  493                                       } else {
  494                                           Response response = new Response();
  495                                           response.setCorrelationId(message.getCommandId());
  496                                           context.getConnection().dispatchAsync(response);
  497                                       }
  498   
  499                                   } catch (Exception e) {
  500                                       if (!sendProducerAck && !context.isInRecoveryMode()) {
  501                                           ExceptionResponse response = new ExceptionResponse(e);
  502                                           response.setCorrelationId(message.getCommandId());
  503                                           context.getConnection().dispatchAsync(response);
  504                                       } else {
  505                                           LOG.debug("unexpected exception on deferred send of :" + message, e);
  506                                       }
  507                                   }
  508                               }
  509                           });
  510                           
  511                           if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
  512                               flowControlTimeoutMessages.add(new TimeoutMessage(message, context, systemUsage.getSendFailIfNoSpaceAfterTimeout()));
  513                           }
  514   
  515                           registerCallbackForNotFullNotification();
  516                           context.setDontSendReponse(true);
  517                           return;
  518                       }
  519   
  520                   } else {
  521   
  522                       if (memoryUsage.isFull()) {
  523                           waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer (" + message.getProducerId() + ") stopped to prevent flooding "
  524                                   + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
  525                       }
  526   
  527                       // The usage manager could have delayed us by the time
  528                       // we unblock the message could have expired..
  529                       if (message.isExpired()) {
  530                           if (LOG.isDebugEnabled()) {
  531                               LOG.debug("Expired message: " + message);
  532                           }
  533                           broker.getRoot().messageExpired(context, message);
  534                           return;
  535                       }
  536                   }
  537               }
  538           }
  539           doMessageSend(producerExchange, message);
  540           if (sendProducerAck) {
  541               ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
  542               context.getConnection().dispatchAsync(ack);
  543           }
  544       }
  545   
  546       private void registerCallbackForNotFullNotification() {
  547           // If the usage manager is not full, then the task will not
  548           // get called..
  549           if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
  550               // so call it directly here.
  551               sendMessagesWaitingForSpaceTask.run();
  552           }
  553       }
  554   
  555       void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
  556           final ConnectionContext context = producerExchange.getConnectionContext();
  557           synchronized (sendLock) {
  558               if (store != null && message.isPersistent()) {
  559                   if (systemUsage.getStoreUsage().isFull()) {
  560   
  561                       String logMessage = "Usage Manager Store is Full. Producer (" + message.getProducerId() + ") stopped to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
  562                               + " See http://activemq.apache.org/producer-flow-control.html for more info";
  563   
  564                       if (systemUsage.isSendFailIfNoSpace()) {
  565                           throw new ResourceAllocationException(logMessage);
  566                       }
  567   
  568                       waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
  569                   }
  570                   message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
  571                   store.addMessage(context, message);
  572   
  573               }
  574           }
  575           if (context.isInTransaction()) {
  576               // If this is a transacted message.. increase the usage now so that
  577               // a big TX does not blow up
  578               // our memory. This increment is decremented once the tx finishes..
  579               message.incrementReferenceCount();
  580               context.getTransaction().addSynchronization(new Synchronization() {
  581                   public void afterCommit() throws Exception {
  582                       try {
  583                           // It could take while before we receive the commit
  584                           // op, by that time the message could have expired..
  585                           if (broker.isExpired(message)) {
  586                               broker.messageExpired(context, message);
  587                               destinationStatistics.getExpired().increment();
  588                               return;
  589                           }
  590                           sendMessage(context, message);
  591                       } finally {
  592                           message.decrementReferenceCount();
  593                       }
  594                   }
  595   
  596                   @Override
  597                   public void afterRollback() throws Exception {
  598                       message.decrementReferenceCount();
  599                   }
  600               });
  601           } else {
  602               // Add to the pending list, this takes care of incrementing the
  603               // usage manager.
  604               sendMessage(context, message);
  605           }
  606       }
  607   
  608       private void expireMessages() {
  609           if (LOG.isDebugEnabled()) {
  610               LOG.debug("Expiring messages ..");
  611           }
  612   
  613           // just track the insertion count
  614           List<Message> browsedMessages = new AbstractList<Message>() {
  615               int size = 0;
  616   
  617               @Override
  618               public void add(int index, Message element) {
  619                   size++;
  620               }
  621   
  622               @Override
  623               public int size() {
  624                   return size;
  625               }
  626   
  627               @Override
  628               public Message get(int index) {
  629                   return null;
  630               }
  631           };
  632           doBrowse(browsedMessages, this.getMaxExpirePageSize());
  633           asyncWakeup();
  634       }
  635   
  636       public void gc() {
  637       }
  638   
  639       public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
  640           messageConsumed(context, node);
  641           if (store != null && node.isPersistent()) {
  642               // the original ack may be a ranged ack, but we are trying to delete a specific
  643               // message store here so we need to convert to a non ranged ack.
  644               if (ack.getMessageCount() > 0) {
  645                   // Dup the ack
  646                   MessageAck a = new MessageAck();
  647                   ack.copy(a);
  648                   ack = a;
  649                   // Convert to non-ranged.
  650                   ack.setFirstMessageId(node.getMessageId());
  651                   ack.setLastMessageId(node.getMessageId());
  652                   ack.setMessageCount(1);
  653               }
  654               store.removeMessage(context, ack);
  655           }
  656       }
  657   
  658       Message loadMessage(MessageId messageId) throws IOException {
  659           Message msg = store.getMessage(messageId);
  660           if (msg != null) {
  661               msg.setRegionDestination(this);
  662           }
  663           return msg;
  664       }
  665   
  666       public String toString() {
  667           int size = 0;
  668           synchronized (messages) {
  669               size = messages.size();
  670           }
  671           return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size + ", in flight groups="
  672                   + messageGroupOwners;
  673       }
  674   
  675       public void start() throws Exception {
  676           if (memoryUsage != null) {
  677               memoryUsage.start();
  678           }
  679           systemUsage.getMemoryUsage().addUsageListener(this);
  680           messages.start();
  681           if (getExpireMessagesPeriod() > 0) {
  682               scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
  683           }
  684           
  685           flowControlTimeoutTask.setName("Producer Flow Control Timeout Task");
  686           
  687           // Start flow control timeout task
  688           // Prevent trying to start it multiple times
  689           if (!flowControlTimeoutTask.isAlive()) {
  690               flowControlTimeoutTask.start();
  691           }
  692           
  693           doPageIn(false);
  694       }
  695   
  696       public void stop() throws Exception {
  697           if (taskRunner != null) {
  698               taskRunner.shutdown();
  699           }
  700           if (this.executor != null) {
  701               this.executor.shutdownNow();
  702           }
  703   
  704           scheduler.cancel(expireMessagesTask);
  705           
  706           if (flowControlTimeoutTask.isAlive()) {
  707               flowControlTimeoutTask.interrupt();
  708           }
  709   
  710           if (messages != null) {
  711               messages.stop();
  712           }
  713   
  714           systemUsage.getMemoryUsage().removeUsageListener(this);
  715           if (memoryUsage != null) {
  716               memoryUsage.stop();
  717           }
  718           if (store != null) {
  719               store.stop();
  720           }
  721       }
  722   
  723       // Properties
  724       // -------------------------------------------------------------------------
  725       public ActiveMQDestination getActiveMQDestination() {
  726           return destination;
  727       }
  728   
  729       public MessageGroupMap getMessageGroupOwners() {
  730           if (messageGroupOwners == null) {
  731               messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
  732           }
  733           return messageGroupOwners;
  734       }
  735   
  736       public DispatchPolicy getDispatchPolicy() {
  737           return dispatchPolicy;
  738       }
  739   
  740       public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
  741           this.dispatchPolicy = dispatchPolicy;
  742       }
  743   
  744       public MessageGroupMapFactory getMessageGroupMapFactory() {
  745           return messageGroupMapFactory;
  746       }
  747   
  748       public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
  749           this.messageGroupMapFactory = messageGroupMapFactory;
  750       }
  751   
  752       public PendingMessageCursor getMessages() {
  753           return this.messages;
  754       }
  755   
  756       public void setMessages(PendingMessageCursor messages) {
  757           this.messages = messages;
  758       }
  759   
  760       public boolean isUseConsumerPriority() {
  761           return useConsumerPriority;
  762       }
  763   
  764       public void setUseConsumerPriority(boolean useConsumerPriority) {
  765           this.useConsumerPriority = useConsumerPriority;
  766       }
  767   
  768       public boolean isStrictOrderDispatch() {
  769           return strictOrderDispatch;
  770       }
  771   
  772       public void setStrictOrderDispatch(boolean strictOrderDispatch) {
  773           this.strictOrderDispatch = strictOrderDispatch;
  774       }
  775   
  776       public boolean isOptimizedDispatch() {
  777           return optimizedDispatch;
  778       }
  779   
  780       public void setOptimizedDispatch(boolean optimizedDispatch) {
  781           this.optimizedDispatch = optimizedDispatch;
  782       }
  783   
  784       public int getTimeBeforeDispatchStarts() {
  785           return timeBeforeDispatchStarts;
  786       }
  787   
  788       public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
  789           this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
  790       }
  791   
  792       public int getConsumersBeforeDispatchStarts() {
  793           return consumersBeforeDispatchStarts;
  794       }
  795   
  796       public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
  797           this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
  798       }
  799   
  800       // Implementation methods
  801       // -------------------------------------------------------------------------
  802       private QueueMessageReference createMessageReference(Message message) {
  803           QueueMessageReference result = new IndirectMessageReference(message);
  804           return result;
  805       }
  806   
  807       public Message[] browse() {
  808           List<Message> browseList = new ArrayList<Message>();
  809           doBrowse(browseList, getMaxBrowsePageSize());
  810           return browseList.toArray(new Message[browseList.size()]);
  811       }
  812   
  813       public void doBrowse(List<Message> browseList, int max) {
  814           final ConnectionContext connectionContext = createConnectionContext();
  815           try {
  816               pageInMessages(false);
  817               List<MessageReference> toExpire = new ArrayList<MessageReference>();
  818               synchronized (dispatchMutex) {
  819                   synchronized (pagedInPendingDispatch) {
  820                       addAll(pagedInPendingDispatch, browseList, max, toExpire);
  821                       for (MessageReference ref : toExpire) {
  822                           pagedInPendingDispatch.remove(ref);
  823                           if (broker.isExpired(ref)) {
  824                               LOG.debug("expiring from pagedInPending: " + ref);
  825                               messageExpired(connectionContext, ref);
  826                           }
  827                       }
  828                   }
  829                   toExpire.clear();
  830                   synchronized (pagedInMessages) {
  831                       addAll(pagedInMessages.values(), browseList, max, toExpire);
  832                   }
  833                   for (MessageReference ref : toExpire) {
  834                       if (broker.isExpired(ref)) {
  835                           LOG.debug("expiring from pagedInMessages: " + ref);
  836                           messageExpired(connectionContext, ref);
  837                       } else {
  838                           synchronized (pagedInMessages) {
  839                               pagedInMessages.remove(ref.getMessageId());
  840                           }
  841                       }
  842                   }
  843   
  844                   if (browseList.size() < getMaxBrowsePageSize()) {
  845                       synchronized (messages) {
  846                           try {
  847                               messages.reset();
  848                               while (messages.hasNext() && browseList.size() < max) {
  849                                   MessageReference node = messages.next();
  850                                   if (node.isExpired()) {
  851                                       if (broker.isExpired(node)) {
  852                                           LOG.debug("expiring from messages: " + node);
  853                                           messageExpired(connectionContext, createMessageReference(node.getMessage()));
  854                                       }
  855                                       messages.remove();
  856                                   } else {
  857                                       messages.rollback(node.getMessageId());
  858                                       if (browseList.contains(node.getMessage()) == false) {
  859                                           browseList.add(node.getMessage());
  860                                       }
  861                                   }
  862                                   node.decrementReferenceCount();
  863                               }
  864                           } finally {
  865                               messages.release();
  866                           }
  867                       }
  868                   }
  869               }
  870           } catch (Exception e) {
  871               LOG.error("Problem retrieving message for browse", e);
  872           }
  873       }
  874   
  875       private void addAll(Collection<QueueMessageReference> refs, List<Message> l, int maxBrowsePageSize, List<MessageReference> toExpire) throws Exception {
  876           for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) {
  877               QueueMessageReference ref = i.next();
  878               if (ref.isExpired()) {
  879                   toExpire.add(ref);
  880               } else if (l.contains(ref.getMessage()) == false) {
  881                   l.add(ref.getMessage());
  882               }
  883           }
  884       }
  885   
  886       public Message getMessage(String id) {
  887           MessageId msgId = new MessageId(id);
  888           try {
  889               synchronized (pagedInMessages) {
  890                   QueueMessageReference r = this.pagedInMessages.get(msgId);
  891                   if (r != null) {
  892                       return r.getMessage();
  893                   }
  894               }
  895               synchronized (messages) {
  896                   try {
  897                       messages.reset();
  898                       while (messages.hasNext()) {
  899                           try {
  900                               MessageReference r = messages.next();
  901                               r.decrementReferenceCount();
  902                               messages.rollback(r.getMessageId());
  903                               if (msgId.equals(r.getMessageId())) {
  904                                   Message m = r.getMessage();
  905                                   if (m != null) {
  906                                       return m;
  907                                   }
  908                                   break;
  909                               }
  910                           } catch (IOException e) {
  911                               LOG.error("got an exception retrieving message " + id);
  912                           }
  913                       }
  914                   } finally {
  915                       messages.release();
  916                   }
  917               }
  918           } catch (IOException e) {
  919               LOG.error("got an exception retrieving message " + id);
  920           }
  921           return null;
  922       }
  923   
  924       public void purge() throws Exception {
  925           ConnectionContext c = createConnectionContext();
  926           List<MessageReference> list = null;
  927           do {
  928               doPageIn(true);
  929               synchronized (pagedInMessages) {
  930                   list = new ArrayList<MessageReference>(pagedInMessages.values());
  931               }
  932   
  933               for (MessageReference ref : list) {
  934                   try {
  935                       QueueMessageReference r = (QueueMessageReference) ref;
  936                       removeMessage(c, (IndirectMessageReference) r);
  937                   } catch (IOException e) {
  938                   }
  939               }
  940   
  941           } while (!pagedInMessages.isEmpty() || this.destinationStatistics.getMessages().getCount() > 0);
  942           gc();
  943           this.destinationStatistics.getMessages().setCount(0);
  944           getMessages().clear();
  945       }
  946   
  947       /**
  948        * Removes the message matching the given messageId
  949        */
  950       public boolean removeMessage(String messageId) throws Exception {
  951           return removeMatchingMessages(createMessageIdFilter(messageId), 1) > 0;
  952       }
  953   
  954       /**
  955        * Removes the messages matching the given selector
  956        * 
  957        * @return the number of messages removed
  958        */
  959       public int removeMatchingMessages(String selector) throws Exception {
  960           return removeMatchingMessages(selector, -1);
  961       }
  962   
  963       /**
  964        * Removes the messages matching the given selector up to the maximum number
  965        * of matched messages
  966        * 
  967        * @return the number of messages removed
  968        */
  969       public int removeMatchingMessages(String selector, int maximumMessages) throws Exception {
  970           return removeMatchingMessages(createSelectorFilter(selector), maximumMessages);
  971       }
  972   
  973       /**
  974        * Removes the messages matching the given filter up to the maximum number
  975        * of matched messages
  976        * 
  977        * @return the number of messages removed
  978        */
  979       public int removeMatchingMessages(MessageReferenceFilter filter, int maximumMessages) throws Exception {
  980           int movedCounter = 0;
  981           Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
  982           ConnectionContext context = createConnectionContext();
  983           do {
  984               doPageIn(true);
  985               synchronized (pagedInMessages) {
  986                   set.addAll(pagedInMessages.values());
  987               }
  988               List<MessageReference> list = new ArrayList<MessageReference>(set);
  989               for (MessageReference ref : list) {
  990                   IndirectMessageReference r = (IndirectMessageReference) ref;
  991                   if (filter.evaluate(context, r)) {
  992   
  993                       removeMessage(context, r);
  994                       set.remove(r);
  995                       if (++movedCounter >= maximumMessages && maximumMessages > 0) {
  996                           return movedCounter;
  997                       }
  998                   }
  999               }
 1000           } while (set.size() < this.destinationStatistics.getMessages().getCount());
 1001           return movedCounter;
 1002       }
 1003   
 1004       /**
 1005        * Copies the message matching the given messageId
 1006        */
 1007       public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception {
 1008           return copyMatchingMessages(context, createMessageIdFilter(messageId), dest, 1) > 0;
 1009       }
 1010   
 1011       /**
 1012        * Copies the messages matching the given selector
 1013        * 
 1014        * @return the number of messages copied
 1015        */
 1016       public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception {
 1017           return copyMatchingMessagesTo(context, selector, dest, -1);
 1018       }
 1019   
 1020       /**
 1021        * Copies the messages matching the given selector up to the maximum number
 1022        * of matched messages
 1023        * 
 1024        * @return the number of messages copied
 1025        */
 1026       public int copyMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception {
 1027           return copyMatchingMessages(context, createSelectorFilter(selector), dest, maximumMessages);
 1028       }
 1029   
 1030       /**
 1031        * Copies the messages matching the given filter up to the maximum number of
 1032        * matched messages
 1033        * 
 1034        * @return the number of messages copied
 1035        */
 1036       public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
 1037           int movedCounter = 0;
 1038           int count = 0;
 1039           Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
 1040           do {
 1041               int oldMaxSize = getMaxPageSize();
 1042               setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
 1043               doPageIn(true);
 1044               setMaxPageSize(oldMaxSize);
 1045               synchronized (pagedInMessages) {
 1046                   set.addAll(pagedInMessages.values());
 1047               }
 1048               List<MessageReference> list = new ArrayList<MessageReference>(set);
 1049               for (MessageReference ref : list) {
 1050                   IndirectMessageReference r = (IndirectMessageReference) ref;
 1051                   if (filter.evaluate(context, r)) {
 1052   
 1053                       r.incrementReferenceCount();
 1054                       try {
 1055                           Message m = r.getMessage();
 1056                           BrokerSupport.resend(context, m, dest);
 1057                           if (++movedCounter >= maximumMessages && maximumMessages > 0) {
 1058                               return movedCounter;
 1059                           }
 1060                       } finally {
 1061                           r.decrementReferenceCount();
 1062                       }
 1063                   }
 1064                   count++;
 1065               }
 1066           } while (count < this.destinationStatistics.getMessages().getCount());
 1067           return movedCounter;
 1068       }
 1069   
 1070       /**
 1071        * Move a message
 1072        * 
 1073        * @param context connection context
 1074        * @param m message
 1075        * @param dest ActiveMQDestination
 1076        * @throws Exception
 1077        */
 1078       public boolean moveMessageTo(ConnectionContext context, Message m, ActiveMQDestination dest) throws Exception {
 1079           QueueMessageReference r = createMessageReference(m);
 1080           BrokerSupport.resend(context, m, dest);
 1081           removeMessage(context, r);
 1082           synchronized (messages) {
 1083               messages.rollback(r.getMessageId());
 1084           }
 1085           return true;
 1086       }
 1087   
 1088       /**
 1089        * Moves the message matching the given messageId
 1090        */
 1091       public boolean moveMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Exception {
 1092           return moveMatchingMessagesTo(context, createMessageIdFilter(messageId), dest, 1) > 0;
 1093       }
 1094   
 1095       /**
 1096        * Moves the messages matching the given selector
 1097        * 
 1098        * @return the number of messages removed
 1099        */
 1100       public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest) throws Exception {
 1101           return moveMatchingMessagesTo(context, selector, dest, Integer.MAX_VALUE);
 1102       }
 1103   
 1104       /**
 1105        * Moves the messages matching the given selector up to the maximum number
 1106        * of matched messages
 1107        */
 1108       public int moveMatchingMessagesTo(ConnectionContext context, String selector, ActiveMQDestination dest, int maximumMessages) throws Exception {
 1109           return moveMatchingMessagesTo(context, createSelectorFilter(selector), dest, maximumMessages);
 1110       }
 1111   
 1112       /**
 1113        * Moves the messages matching the given filter up to the maximum number of
 1114        * matched messages
 1115        */
 1116       public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
 1117           int movedCounter = 0;
 1118           Set<MessageReference> set = new CopyOnWriteArraySet<MessageReference>();
 1119           do {
 1120               doPageIn(true);
 1121               synchronized (pagedInMessages) {
 1122                   set.addAll(pagedInMessages.values());
 1123               }
 1124               List<MessageReference> list = new ArrayList<MessageReference>(set);
 1125               for (MessageReference ref : list) {
 1126                   IndirectMessageReference r = (IndirectMessageReference) ref;
 1127                   if (filter.evaluate(context, r)) {
 1128                       // We should only move messages that can be locked.
 1129                       moveMessageTo(context, ref.getMessage(), dest);
 1130                       set.remove(r);
 1131                       if (++movedCounter >= maximumMessages && maximumMessages > 0) {
 1132                           return movedCounter;
 1133                       }
 1134                   }
 1135               }
 1136           } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
 1137           return movedCounter;
 1138       }
 1139   
 1140       BrowserDispatch getNextBrowserDispatch() {
 1141           synchronized (pagedInMessages) {
 1142               if (browserDispatches.isEmpty()) {
 1143                   return null;
 1144               }
 1145               return browserDispatches.removeFirst();
 1146           }
 1147   
 1148       }
 1149   
 1150       /**
 1151        * @return true if we would like to iterate again
 1152        * @see org.apache.activemq.thread.Task#iterate()
 1153        */
 1154       public boolean iterate() {
 1155           boolean pageInMoreMessages = false;       
 1156           synchronized (iteratingMutex) {
 1157   
 1158               // do early to allow dispatch of these waiting messages
 1159               synchronized (messagesWaitingForSpace) {
 1160                   Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
 1161                   while (it.hasNext()) {
 1162                       if (!memoryUsage.isFull()) {
 1163                           Runnable op = it.next();
 1164                           it.remove();
 1165                           op.run();
 1166                       } else {
 1167                           registerCallbackForNotFullNotification();
 1168                           break;
 1169                       }
 1170                   }
 1171               }
 1172   
 1173               if (firstConsumer) {
 1174                   firstConsumer = false;
 1175                   try {
 1176                       if (consumersBeforeDispatchStarts > 0) {
 1177                           int timeout = 1000; // wait one second by default if consumer count isn't reached  
 1178                           if (timeBeforeDispatchStarts > 0) {
 1179                               timeout = timeBeforeDispatchStarts;
 1180                           }
 1181                           if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
 1182                               if (LOG.isDebugEnabled()) {
 1183                                   LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch.");
 1184                               }
 1185                           } else {
 1186                               if (LOG.isDebugEnabled()) {
 1187                                   LOG.debug(timeout + " ms elapsed and " + consumers.size() + " consumers subscribed. Starting dispatch.");
 1188                               }
 1189                           }
 1190                       }
 1191                       if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
 1192                           iteratingMutex.wait(timeBeforeDispatchStarts);
 1193                           if (LOG.isDebugEnabled()) {
 1194                               LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch.");
 1195                           }
 1196                       }
 1197                   } catch (Exception e) {
 1198                       LOG.error(e);
 1199                   }
 1200               }
 1201               
 1202               BrowserDispatch pendingBrowserDispatch = getNextBrowserDispatch();
 1203   
 1204               synchronized (messages) {
 1205                   pageInMoreMessages |= !messages.isEmpty();
 1206               }
 1207   
 1208               // Kinda ugly.. but I think dispatchLock is the only mutex protecting the 
 1209               // pagedInPendingDispatch variable. 	        
 1210               synchronized (dispatchMutex) {
 1211                   pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
 1212               }
 1213   
 1214               // Perhaps we should page always into the pagedInPendingDispatch list if 
 1215               // !messages.isEmpty(), and then if !pagedInPendingDispatch.isEmpty()
 1216               // then we do a dispatch.
 1217               if (pageInMoreMessages || pendingBrowserDispatch != null) {
 1218                   try {
 1219                       pageInMessages(pendingBrowserDispatch != null);
 1220   
 1221                   } catch (Throwable e) {
 1222                       LOG.error("Failed to page in more queue messages ", e);
 1223                   }
 1224               }
 1225               
 1226               if (pendingBrowserDispatch != null) {
 1227                   ArrayList<QueueMessageReference> alreadyDispatchedMessages = null;
 1228                   synchronized (pagedInMessages) {
 1229                       alreadyDispatchedMessages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
 1230                   }
 1231                   if (LOG.isDebugEnabled()) {
 1232                       LOG.debug("dispatch to browser: " + pendingBrowserDispatch.getBrowser()
 1233                               + ", already dispatched/paged count: " + alreadyDispatchedMessages.size());
 1234                   }
 1235                   do {
 1236                       try {
 1237                           MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
 1238                           msgContext.setDestination(destination);
 1239                           
 1240                           QueueBrowserSubscription browser = pendingBrowserDispatch.getBrowser();
 1241                           for (QueueMessageReference node : alreadyDispatchedMessages) {
 1242                               if (!node.isAcked()) {
 1243                                   msgContext.setMessageReference(node);
 1244                                   if (browser.matches(node, msgContext)) {
 1245                                       browser.add(node);
 1246                                   }
 1247                               }
 1248                           }
 1249                           pendingBrowserDispatch.done();
 1250                       } catch (Exception e) {
 1251                           LOG.warn("exception on dispatch to browser: " + pendingBrowserDispatch.getBrowser(), e);
 1252                       }
 1253                   
 1254                   } while ((pendingBrowserDispatch = getNextBrowserDispatch()) != null);
 1255               }
 1256               
 1257               if (pendingWakeups.get() > 0) {
 1258                   pendingWakeups.decrementAndGet();
 1259               }
 1260               return pendingWakeups.get() > 0;
 1261           }
 1262       }
 1263   
 1264       protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
 1265           return new MessageReferenceFilter() {
 1266               public boolean evaluate(ConnectionContext context, MessageReference r) {
 1267                   return messageId.equals(r.getMessageId().toString());
 1268               }
 1269   
 1270               public String toString() {
 1271                   return "MessageIdFilter: " + messageId;
 1272               }
 1273           };
 1274       }
 1275   
 1276       protected MessageReferenceFilter createSelectorFilter(String selector) throws InvalidSelectorException {
 1277           final BooleanExpression selectorExpression = SelectorParser.parse(selector);
 1278   
 1279           return new MessageReferenceFilter() {
 1280               public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException {
 1281                   MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext();
 1282   
 1283                   messageEvaluationContext.setMessageReference(r);
 1284                   if (messageEvaluationContext.getDestination() == null) {
 1285                       messageEvaluationContext.setDestination(getActiveMQDestination());
 1286                   }
 1287   
 1288                   return selectorExpression.matches(messageEvaluationContext);
 1289               }
 1290           };
 1291       }
 1292   
 1293       protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
 1294           removeMessage(c, null, r);
 1295           synchronized (dispatchMutex) {
 1296               synchronized (pagedInPendingDispatch) {
 1297                   pagedInPendingDispatch.remove(r);
 1298               }
 1299           }
 1300       }
 1301   
 1302       protected void removeMessage(ConnectionContext c, Subscription subs, QueueMessageReference r) throws IOException {
 1303           MessageAck ack = new MessageAck();
 1304           ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
 1305           ack.setDestination(destination);
 1306           ack.setMessageID(r.getMessageId());
 1307           removeMessage(c, subs, r, ack);
 1308       }
 1309   
 1310       protected void removeMessage(ConnectionContext context, Subscription sub, final QueueMessageReference reference, MessageAck ack) throws IOException {
 1311           reference.setAcked(true);
 1312           // This sends the ack the the journal..
 1313           if (!ack.isInTransaction()) {
 1314               acknowledge(context, sub, ack, reference);
 1315               getDestinationStatistics().getDequeues().increment();
 1316               dropMessage(reference);
 1317           } else {
 1318               try {
 1319                   acknowledge(context, sub, ack, reference);
 1320               } finally {
 1321                   context.getTransaction().addSynchronization(new Synchronization() {
 1322   
 1323                       public void afterCommit() throws Exception {
 1324                           getDestinationStatistics().getDequeues().increment();
 1325                           dropMessage(reference);
 1326                           wakeup();
 1327                       }
 1328   
 1329                       public void afterRollback() throws Exception {
 1330                           reference.setAcked(false);
 1331                       }
 1332                   });
 1333               }
 1334           }
 1335           if (ack.isPoisonAck()) {
 1336               // message gone to DLQ, is ok to allow redelivery
 1337               synchronized (messages) {
 1338                   messages.rollback(reference.getMessageId());
 1339               }
 1340           }
 1341   
 1342       }
 1343   
 1344       private void dropMessage(QueueMessageReference reference) {
 1345           reference.drop();
 1346           destinationStatistics.getMessages().decrement();
 1347           synchronized (pagedInMessages) {
 1348               pagedInMessages.remove(reference.getMessageId());
 1349           }
 1350       }
 1351   
 1352       public void messageExpired(ConnectionContext context, MessageReference reference) {
 1353           messageExpired(context, null, reference);
 1354       }
 1355   
 1356       public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
 1357           if (LOG.isDebugEnabled()) {
 1358               LOG.debug("message expired: " + reference);
 1359           }
 1360           broker.messageExpired(context, reference);
 1361           destinationStatistics.getExpired().increment();
 1362           try {
 1363               removeMessage(context, subs, (QueueMessageReference) reference);
 1364           } catch (IOException e) {
 1365               LOG.error("Failed to remove expired Message from the store ", e);
 1366           }
 1367       }
 1368   
 1369       protected ConnectionContext createConnectionContext() {
 1370           ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext());
 1371           answer.setBroker(this.broker);
 1372           answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
 1373           answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
 1374           return answer;
 1375       }
 1376   
 1377       final void sendMessage(final ConnectionContext context, Message msg) throws Exception {
 1378           if (!msg.isPersistent() && messages.getSystemUsage() != null) {
 1379               if (systemUsage.getTempUsage().isFull()) {
 1380                   final String logMessage = "Usage Manager Temp Store is Full. Stopping producer (" + msg.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
 1381                           + " See http://activemq.apache.org/producer-flow-control.html for more info";
 1382                   if (systemUsage.isSendFailIfNoSpace()) {
 1383                       throw new ResourceAllocationException(logMessage);
 1384                   }
 1385   
 1386                   waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
 1387               }
 1388   
 1389           }
 1390           synchronized (messages) {
 1391               messages.addMessageLast(msg);
 1392           }
 1393           destinationStatistics.getEnqueues().increment();
 1394           destinationStatistics.getMessages().increment();
 1395           messageDelivered(context, msg);
 1396           synchronized (consumers) {
 1397               if (consumers.isEmpty()) {
 1398                   onMessageWithNoConsumers(context, msg);
 1399               }
 1400           }
 1401           wakeup();
 1402       }
 1403   
 1404       public void wakeup() {
 1405           if (optimizedDispatch || isSlave()) {
 1406               iterate();
 1407               pendingWakeups.incrementAndGet();
 1408           } else {
 1409               asyncWakeup();
 1410           }
 1411       }
 1412   
 1413       private void asyncWakeup() {
 1414           try {
 1415               pendingWakeups.incrementAndGet();
 1416               this.taskRunner.wakeup();
 1417           } catch (InterruptedException e) {
 1418               LOG.warn("Async task tunner failed to wakeup ", e);
 1419           }
 1420       }
 1421   
 1422       private boolean isSlave() {
 1423           return broker.getBrokerService().isSlave();
 1424       }
 1425   
 1426       private List<QueueMessageReference> doPageIn(boolean force) throws Exception {
 1427           List<QueueMessageReference> result = null;
 1428           List<QueueMessageReference> resultList = null;
 1429           synchronized (dispatchMutex) {
 1430               int toPageIn = Math.min(getMaxPageSize(), messages.size());
 1431               if (LOG.isDebugEnabled()) {
 1432                   LOG.debug(destination.getPhysicalName() + " toPageIn: " + toPageIn + ", Inflight: " + destinationStatistics.getInflight().getCount() + ", pagedInMessages.size "
 1433                           + pagedInMessages.size() + ", enqueueSize: " + destinationStatistics.getEnqueues().getCount());
 1434               }
 1435   
 1436               if (isLazyDispatch() && !force) {
 1437                   // Only page in the minimum number of messages which can be dispatched immediately.
 1438                   toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
 1439               }
 1440               if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingDispatch.size() < getMaxPageSize()))) {
 1441                   int count = 0;
 1442                   result = new ArrayList<QueueMessageReference>(toPageIn);
 1443                   synchronized (messages) {
 1444                       try {
 1445                           messages.setMaxBatchSize(toPageIn);
 1446                           messages.reset();
 1447                           while (messages.hasNext() && count < toPageIn) {
 1448                               MessageReference node = messages.next();
 1449                               messages.remove();
 1450                               QueueMessageReference ref = createMessageReference(node.getMessage());
 1451                               if (ref.isExpired()) {
 1452                                   if (broker.isExpired(ref)) {
 1453                                       messageExpired(createConnectionContext(), ref);
 1454                                   } else {
 1455                                       ref.decrementReferenceCount();
 1456                                   }
 1457                               } else {
 1458                                   result.add(ref);
 1459                                   count++;
 1460                               }
 1461                           }
 1462                       } finally {
 1463                           messages.release();
 1464                       }
 1465                   }
 1466                   // Only add new messages, not already pagedIn to avoid multiple dispatch attempts
 1467                   synchronized (pagedInMessages) {
 1468                       resultList = new ArrayList<QueueMessageReference>(result.size());
 1469                       for (QueueMessageReference ref : result) {
 1470                           if (!pagedInMessages.containsKey(ref.getMessageId())) {
 1471                               pagedInMessages.put(ref.getMessageId(), ref);
 1472                               resultList.add(ref);
 1473                           } else {
 1474                               ref.decrementReferenceCount();
 1475                           }
 1476                       }
 1477                   }
 1478               } else {
 1479                   // Avoid return null list, if condition is not validated
 1480                   resultList = new ArrayList<QueueMessageReference>();
 1481               }
 1482           }
 1483           return resultList;
 1484       }
 1485   
 1486       private void doDispatch(List<QueueMessageReference> list) throws Exception {
 1487           boolean doWakeUp = false;
 1488           synchronized (dispatchMutex) {
 1489   
 1490               synchronized (pagedInPendingDispatch) {
 1491                   if (!pagedInPendingDispatch.isEmpty()) {
 1492                       // Try to first dispatch anything that had not been
 1493                       // dispatched before.
 1494                       pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
 1495                   }
 1496                   // and now see if we can dispatch the new stuff.. and append to the pending
 1497                   // list anything that does not actually get dispatched.
 1498                   if (list != null && !list.isEmpty()) {
 1499                       if (pagedInPendingDispatch.isEmpty()) {
 1500                           pagedInPendingDispatch.addAll(doActualDispatch(list));
 1501                       } else {
 1502                           for (QueueMessageReference qmr : list) {
 1503                               if (!pagedInPendingDispatch.contains(qmr)) {
 1504                                   pagedInPendingDispatch.add(qmr);
 1505                               }
 1506                           }
 1507                           doWakeUp = true;
 1508                       }
 1509                   }
 1510               }
 1511           }
 1512           if (doWakeUp) {
 1513               // avoid lock order contention
 1514               asyncWakeup();
 1515           }
 1516       }
 1517   
 1518       /**
 1519        * @return list of messages that could get dispatched to consumers if they
 1520        *         were not full.
 1521        */
 1522       private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
 1523           List<Subscription> consumers;
 1524   
 1525           synchronized (this.consumers) {
 1526               if (this.consumers.isEmpty() || isSlave()) {
 1527                   // slave dispatch happens in processDispatchNotification
 1528                   return list;
 1529               }
 1530               consumers = new ArrayList<Subscription>(this.consumers);
 1531           }
 1532   
 1533           List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
 1534           Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
 1535   
 1536           for (MessageReference node : list) {
 1537               Subscription target = null;
 1538               int interestCount = 0;
 1539               for (Subscription s : consumers) {
 1540                   if (s instanceof QueueBrowserSubscription) {
 1541                       interestCount++;
 1542                       continue;
 1543                   }
 1544                   if (dispatchSelector.canSelect(s, node)) {
 1545                       if (!fullConsumers.contains(s)) {
 1546                           if (!s.isFull()) {
 1547                               // Dispatch it.
 1548                               s.add(node);
 1549                               target = s;
 1550                               break;
 1551                           } else {
 1552                               // no further dispatch of list to a full consumer to avoid out of order message receipt 
 1553                               fullConsumers.add(s);
 1554                           }
 1555                       }
 1556                       interestCount++;
 1557                   } else {
 1558                       // makes sure it gets dispatched again
 1559                       if (!node.isDropped() && !((QueueMessageReference) node).isAcked() && (!node.isDropped() || s.getConsumerInfo().isBrowser())) {
 1560                           interestCount++;
 1561                       }
 1562                   }
 1563               }
 1564   
 1565               if ((target == null && interestCount > 0) || consumers.size() == 0) {
 1566                   // This means all subs were full or that there are no consumers...
 1567                   rc.add((QueueMessageReference) node);
 1568               }
 1569   
 1570               // If it got dispatched, rotate the consumer list to get round robin distribution. 
 1571               if (target != null && !strictOrderDispatch && consumers.size() > 1 && !dispatchSelector.isExclusiveConsumer(target)) {
 1572                   synchronized (this.consumers) {
 1573                       if (removeFromConsumerList(target)) {
 1574                           addToConsumerList(target);
 1575                           consumers = new ArrayList<Subscription>(this.consumers);
 1576                       }
 1577                   }
 1578               }
 1579           }
 1580   
 1581           return rc;
 1582       }
 1583   
 1584       protected void pageInMessages(boolean force) throws Exception {
 1585           doDispatch(doPageIn(force));
 1586       }
 1587   
 1588       private void addToConsumerList(Subscription sub) {
 1589           if (useConsumerPriority) {
 1590               consumers.add(sub);
 1591               Collections.sort(consumers, orderedCompare);
 1592           } else {
 1593               consumers.add(sub);
 1594           }
 1595       }
 1596   
 1597       private boolean removeFromConsumerList(Subscription sub) {
 1598           return consumers.remove(sub);
 1599       }
 1600   
 1601       private int getConsumerMessageCountBeforeFull() throws Exception {
 1602           int total = 0;
 1603           boolean zeroPrefetch = false;
 1604           synchronized (consumers) {
 1605               for (Subscription s : consumers) {
 1606                   zeroPrefetch |= s.getPrefetchSize() == 0;
 1607                   int countBeforeFull = s.countBeforeFull();
 1608                   total += countBeforeFull;
 1609               }
 1610           }
 1611           if (total == 0 && zeroPrefetch) {
 1612               total = 1;
 1613           }
 1614           return total;
 1615       }
 1616   
 1617       /*
 1618        * In slave mode, dispatch is ignored till we get this notification as the
 1619        * dispatch process is non deterministic between master and slave. On a
 1620        * notification, the actual dispatch to the subscription (as chosen by the
 1621        * master) is completed. (non-Javadoc)
 1622        * 
 1623        * @see
 1624        * org.apache.activemq.broker.region.BaseDestination#processDispatchNotification
 1625        * (org.apache.activemq.command.MessageDispatchNotification)
 1626        */
 1627       public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
 1628           // do dispatch
 1629           Subscription sub = getMatchingSubscription(messageDispatchNotification);
 1630           if (sub != null) {
 1631               MessageReference message = getMatchingMessage(messageDispatchNotification);
 1632               sub.add(message);
 1633               sub.processMessageDispatchNotification(messageDispatchNotification);
 1634           }
 1635       }
 1636   
 1637       private QueueMessageReference getMatchingMessage(MessageDispatchNotification messageDispatchNotification) throws Exception {
 1638           QueueMessageReference message = null;
 1639           MessageId messageId = messageDispatchNotification.getMessageId();
 1640   
 1641           synchronized (dispatchMutex) {
 1642               synchronized (pagedInPendingDispatch) {
 1643                   for (QueueMessageReference ref : pagedInPendingDispatch) {
 1644                       if (messageId.equals(ref.getMessageId())) {
 1645                           message = ref;
 1646                           pagedInPendingDispatch.remove(ref);
 1647                           break;
 1648                       }
 1649                   }
 1650               }
 1651   
 1652               if (message == null) {
 1653                   synchronized (pagedInMessages) {
 1654                       message = pagedInMessages.get(messageId);
 1655                   }
 1656               }
 1657   
 1658               if (message == null) {
 1659                   synchronized (messages) {
 1660                       try {
 1661                           messages.setMaxBatchSize(getMaxPageSize());
 1662                           messages.reset();
 1663                           while (messages.hasNext()) {
 1664                               MessageReference node = messages.next();
 1665                               messages.remove();
 1666                               if (messageId.equals(node.getMessageId())) {
 1667                                   message = this.createMessageReference(node.getMessage());
 1668                                   break;
 1669                               }
 1670                           }
 1671                       } finally {
 1672                           messages.release();
 1673                       }
 1674                   }
 1675               }
 1676   
 1677               if (message == null) {
 1678                   Message msg = loadMessage(messageId);
 1679                   if (msg != null) {
 1680                       message = this.createMessageReference(msg);
 1681                   }
 1682               }
 1683   
 1684           }
 1685           if (message == null) {
 1686               throw new JMSException("Slave broker out of sync with master - Message: " + messageDispatchNotification.getMessageId() + " on " + messageDispatchNotification.getDestination()
 1687                       + " does not exist among pending(" + pagedInPendingDispatch.size() + ") for subscription: " + messageDispatchNotification.getConsumerId());
 1688           }
 1689           return message;
 1690       }
 1691   
 1692       /**
 1693        * Find a consumer that matches the id in the message dispatch notification
 1694        * 
 1695        * @param messageDispatchNotification
 1696        * @return sub or null if the subscription has been removed before dispatch
 1697        * @throws JMSException
 1698        */
 1699       private Subscription getMatchingSubscription(MessageDispatchNotification messageDispatchNotification) throws JMSException {
 1700           Subscription sub = null;
 1701           synchronized (consumers) {
 1702               for (Subscription s : consumers) {
 1703                   if (messageDispatchNotification.getConsumerId().equals(s.getConsumerInfo().getConsumerId())) {
 1704                       sub = s;
 1705                       break;
 1706                   }
 1707               }
 1708           }
 1709           return sub;
 1710       }
 1711   
 1712       public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
 1713           if (oldPercentUsage > newPercentUsage) {
 1714               asyncWakeup();
 1715           }
 1716       }
 1717   
 1718       private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
 1719           if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
 1720               if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout())) {
 1721                   throw new ResourceAllocationException(warning);
 1722               }
 1723           } else {
 1724               long start = System.currentTimeMillis();
 1725               long nextWarn = start + blockedProducerWarningInterval;
 1726               while (!usage.waitForSpace(1000)) {
 1727                   if (context.getStopping().get()) {
 1728                       throw new IOException("Connection closed, send aborted.");
 1729                   }
 1730       
 1731                   long now = System.currentTimeMillis();
 1732                   if (now >= nextWarn) {
 1733                       LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
 1734                       nextWarn = now + blockedProducerWarningInterval;
 1735                   }
 1736               }
 1737           }
 1738       }
 1739   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]