Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region;
   18   
   19   import java.io.IOException;
   20   import java.util.ArrayList;
   21   import java.util.Iterator;
   22   import java.util.List;
   23   import java.util.concurrent.CopyOnWriteArrayList;
   24   import java.util.concurrent.CountDownLatch;
   25   import java.util.concurrent.TimeUnit;
   26   
   27   import javax.jms.InvalidSelectorException;
   28   import javax.jms.JMSException;
   29   
   30   import org.apache.activemq.ActiveMQMessageAudit;
   31   import org.apache.activemq.broker.Broker;
   32   import org.apache.activemq.broker.ConnectionContext;
   33   import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
   34   import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
   35   import org.apache.activemq.command.ConsumerControl;
   36   import org.apache.activemq.command.ConsumerInfo;
   37   import org.apache.activemq.command.Message;
   38   import org.apache.activemq.command.MessageAck;
   39   import org.apache.activemq.command.MessageDispatch;
   40   import org.apache.activemq.command.MessageDispatchNotification;
   41   import org.apache.activemq.command.MessageId;
   42   import org.apache.activemq.command.MessagePull;
   43   import org.apache.activemq.command.Response;
   44   import org.apache.activemq.thread.Scheduler;
   45   import org.apache.activemq.transaction.Synchronization;
   46   import org.apache.activemq.usage.SystemUsage;
   47   import org.apache.commons.logging.Log;
   48   import org.apache.commons.logging.LogFactory;
   49   
   50   /**
   51    * A subscription that honors the pre-fetch option of the ConsumerInfo.
   52    * 
   53    * @version $Revision: 1.15 $
   54    */
   55   public abstract class PrefetchSubscription extends AbstractSubscription {
   56   
   57       private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
   58       protected static final Scheduler scheduler = Scheduler.getInstance();
   59       
   60       protected PendingMessageCursor pending;
   61       protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
   62       protected int prefetchExtension;
   63       protected long enqueueCounter;
   64       protected long dispatchCounter;
   65       protected long dequeueCounter;
   66       private int maxProducersToAudit=32;
   67       private int maxAuditDepth=2048;
   68       protected final SystemUsage usageManager;
   69       private final Object pendingLock = new Object();
   70       private final Object dispatchLock = new Object();
   71       protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
   72       private boolean slowConsumer;
   73   
   74       private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
   75       
   76       public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
   77           super(broker,context, info);
   78           this.usageManager=usageManager;
   79           pending = cursor;
   80       }
   81   
   82       public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
   83           this(broker,usageManager,context, info, new VMPendingMessageCursor());
   84       }
   85   
   86       /**
   87        * Allows a message to be pulled on demand by a client
   88        */
   89       public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
   90           // The slave should not deliver pull messages. TODO: when the slave
   91           // becomes a master,
   92           // He should send a NULL message to all the consumers to 'wake them up'
   93           // in case
   94           // they were waiting for a message.
   95           if (getPrefetchSize() == 0 && !isSlave()) {
   96               final long dispatchCounterBeforePull;
   97           	synchronized(this) {
   98           		prefetchExtension++;
   99           		dispatchCounterBeforePull = dispatchCounter;
  100           	}
  101               
  102           	// Have the destination push us some messages.
  103           	for (Destination dest : destinations) {
  104   				dest.iterate();
  105   			}
  106           	dispatchPending();
  107               
  108               synchronized(this) {
  109   	            // If there was nothing dispatched.. we may need to setup a timeout.
  110   	            if (dispatchCounterBeforePull == dispatchCounter) {
  111   	                // immediate timeout used by receiveNoWait()
  112   	                if (pull.getTimeout() == -1) {
  113   	                    // Send a NULL message.
  114   	                    add(QueueMessageReference.NULL_MESSAGE);
  115   	                    dispatchPending();
  116   	                }
  117   	                if (pull.getTimeout() > 0) {
  118   	                    scheduler.executeAfterDelay(new Runnable() {
  119   	
  120   	                        public void run() {
  121   	                            pullTimeout(dispatchCounterBeforePull);
  122   	                        }
  123   	                    }, pull.getTimeout());
  124   	                }
  125   	            }
  126               }
  127           }
  128           return null;
  129       }
  130   
  131       /**
  132        * Occurs when a pull times out. If nothing has been dispatched since the
  133        * timeout was setup, then send the NULL message.
  134        */
  135       final void pullTimeout(long dispatchCounterBeforePull) {
  136       	synchronized (pendingLock) {
  137       		if (dispatchCounterBeforePull == dispatchCounter) {
  138                   try {
  139                       add(QueueMessageReference.NULL_MESSAGE);
  140                       dispatchPending();
  141                   } catch (Exception e) {
  142                       context.getConnection().serviceException(e);
  143                   }
  144               }
  145           }
  146       }
  147   
  148       public void add(MessageReference node) throws Exception {
  149           synchronized (pendingLock) {
  150               // The destination may have just been removed...  
  151               if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
  152                   // perhaps we should inform the caller that we are no longer valid to dispatch to?
  153                   return;
  154               }
  155               enqueueCounter++;
  156               pending.addMessageLast(node);    
  157           }
  158           dispatchPending();
  159       }
  160   
  161       public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
  162           synchronized(pendingLock) {
  163               try {
  164                   pending.reset();
  165                   while (pending.hasNext()) {
  166                       MessageReference node = pending.next();
  167                       node.decrementReferenceCount();
  168                       if (node.getMessageId().equals(mdn.getMessageId())) {
  169                           // Synchronize between dispatched list and removal of messages from pending list
  170                           // related to remove subscription action
  171                           synchronized(dispatchLock) {
  172                               pending.remove();
  173                               createMessageDispatch(node, node.getMessage());
  174                               dispatched.add(node);
  175                               onDispatch(node, node.getMessage());
  176                           }
  177                           return;
  178                       }
  179                   }
  180               } finally {
  181                   pending.release();
  182               }
  183           }
  184           throw new JMSException(
  185                   "Slave broker out of sync with master: Dispatched message ("
  186                           + mdn.getMessageId() + ") was not in the pending list for "
  187                           + mdn.getConsumerId() + " on " + mdn.getDestination().getPhysicalName());
  188       }
  189   
  190       public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
  191           // Handle the standard acknowledgment case.
  192           boolean callDispatchMatched = false;
  193           Destination destination = null;
  194           
  195           if (!isSlave()) {
  196               if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
  197                   // suppress unexpected ack exception in this expected case
  198                   LOG.warn("Ignoring ack received before dispatch; result of failover with an outstanding ack. Acked messages will be replayed if present on this broker. Ignored ack: " + ack);
  199                   return;
  200               }
  201           }
  202           if (LOG.isTraceEnabled()) {
  203               LOG.trace("ack:" + ack);
  204           }
  205           synchronized(dispatchLock) {
  206               if (ack.isStandardAck()) {
  207               	// First check if the ack matches the dispatched. When using failover this might
  208               	// not be the case. We don't ever want to ack the wrong messages.
  209               	assertAckMatchesDispatched(ack);
  210               	
  211                   // Acknowledge all dispatched messages up till the message id of
  212                   // the acknowledgment.
  213                   int index = 0;
  214                   boolean inAckRange = false;
  215                   List<MessageReference> removeList = new ArrayList<MessageReference>();
  216                   for (final MessageReference node : dispatched) {
  217                       MessageId messageId = node.getMessageId();
  218                       if (ack.getFirstMessageId() == null
  219                               || ack.getFirstMessageId().equals(messageId)) {
  220                           inAckRange = true;
  221                       }
  222                       if (inAckRange) {
  223                           // Don't remove the nodes until we are committed.  
  224                           if (!context.isInTransaction()) {
  225                               dequeueCounter++;
  226                               node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
  227                               removeList.add(node);
  228                           } else {
  229                               // setup a Synchronization to remove nodes from the
  230                               // dispatched list.
  231                               context.getTransaction().addSynchronization(
  232                                       new Synchronization() {
  233   
  234                                           public void afterCommit()
  235                                                   throws Exception {
  236                                               synchronized(dispatchLock) {
  237                                                   dequeueCounter++;
  238                                                   dispatched.remove(node);
  239                                                   node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
  240                                               }
  241                                           }
  242   
  243                                           public void afterRollback() throws Exception {
  244                                               synchronized(dispatchLock) {
  245                                                   if (isSlave()) {
  246                                                       node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
  247                                                   } else {
  248                                                       // poisionAck will decrement - otherwise still inflight on client
  249                                                   }
  250                                               }
  251                                           }
  252                                       });
  253                           }
  254                           index++;
  255                           acknowledge(context, ack, node);
  256                           if (ack.getLastMessageId().equals(messageId)) {                  
  257                               // contract prefetch if dispatch required a pull
  258                               if (getPrefetchSize() == 0) {
  259                                   prefetchExtension = Math.max(0, prefetchExtension - index);
  260                               } else if (context.isInTransaction()) {
  261                                   // extend prefetch window only if not a pulling consumer
  262                                   prefetchExtension = Math.max(prefetchExtension, index);
  263                               }
  264                               destination = node.getRegionDestination();
  265                               callDispatchMatched = true;
  266                               break;
  267                           }
  268                       }
  269                   }
  270                   for (final MessageReference node : removeList) {
  271                       dispatched.remove(node);
  272                   }
  273                   // this only happens after a reconnect - get an ack which is not
  274                   // valid
  275                   if (!callDispatchMatched) {
  276                       LOG.error("Could not correlate acknowledgment with dispatched message: "
  277                                     + ack);
  278                   }
  279               } else if (ack.isIndividualAck()) {
  280                   // Message was delivered and acknowledge - but only delete the
  281                   // individual message
  282                   for (final MessageReference node : dispatched) {
  283                       MessageId messageId = node.getMessageId();
  284                       if (ack.getLastMessageId().equals(messageId)) {
  285                           // this should never be within a transaction
  286                           node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
  287                           destination = node.getRegionDestination();
  288                           acknowledge(context, ack, node);
  289                           dispatched.remove(node);
  290                           prefetchExtension = Math.max(0, prefetchExtension - 1);
  291                           callDispatchMatched = true;
  292                           break;
  293                       }
  294                   }
  295               }else if (ack.isDeliveredAck()) {
  296                   // Message was delivered but not acknowledged: update pre-fetch
  297                   // counters.
  298                   int index = 0;
  299                   for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
  300                       final MessageReference node = iter.next();
  301                       if (node.isExpired()) {
  302                           if (broker.isExpired(node)) {
  303                               node.getRegionDestination().messageExpired(context, this, node);
  304                           }
  305                           dispatched.remove(node);
  306                           node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
  307                       }
  308                       if (ack.getLastMessageId().equals(node.getMessageId())) {
  309                           prefetchExtension = Math.max(prefetchExtension, index + 1);
  310                           destination = node.getRegionDestination();
  311                           callDispatchMatched = true;
  312                           break;
  313                       }
  314                   }
  315                   if (!callDispatchMatched) {
  316                       throw new JMSException(
  317                               "Could not correlate acknowledgment with dispatched message: "
  318                                       + ack);
  319                   }
  320               } else if (ack.isRedeliveredAck()) {
  321                   // Message was re-delivered but it was not yet considered to be
  322                   // a DLQ message.
  323                   boolean inAckRange = false;
  324                   for (final MessageReference node : dispatched) {
  325                       MessageId messageId = node.getMessageId();
  326                       if (ack.getFirstMessageId() == null
  327                               || ack.getFirstMessageId().equals(messageId)) {
  328                           inAckRange = true;
  329                       }
  330                       if (inAckRange) {
  331                           if (ack.getLastMessageId().equals(messageId)) {
  332                               destination = node.getRegionDestination();
  333                               callDispatchMatched = true;
  334                               break;
  335                           }
  336                       }
  337                   }
  338                   if (!callDispatchMatched) {
  339                       throw new JMSException(
  340                               "Could not correlate acknowledgment with dispatched message: "
  341                                       + ack);
  342                   }
  343               } else if (ack.isPoisonAck()) {
  344                   // TODO: what if the message is already in a DLQ???
  345                   // Handle the poison ACK case: we need to send the message to a
  346                   // DLQ
  347                   if (ack.isInTransaction()) {
  348                       throw new JMSException("Poison ack cannot be transacted: "
  349                               + ack);
  350                   }
  351                   int index = 0;
  352                   boolean inAckRange = false;
  353                   List<MessageReference> removeList = new ArrayList<MessageReference>();
  354                   for (final MessageReference node : dispatched) {
  355                       MessageId messageId = node.getMessageId();
  356                       if (ack.getFirstMessageId() == null
  357                               || ack.getFirstMessageId().equals(messageId)) {
  358                           inAckRange = true;
  359                       }
  360                       if (inAckRange) {
  361                           sendToDLQ(context, node);
  362                           node.getRegionDestination().getDestinationStatistics()
  363                                   .getInflight().decrement();
  364                           removeList.add(node);
  365                           dequeueCounter++;
  366                           index++;
  367                           acknowledge(context, ack, node);
  368                           if (ack.getLastMessageId().equals(messageId)) {
  369                               prefetchExtension = Math.max(0, prefetchExtension
  370                                       - (index + 1));
  371                               destination = node.getRegionDestination();
  372                               callDispatchMatched = true;
  373                               break;
  374                           }
  375                       }
  376                   }
  377                   for (final MessageReference node : removeList) {
  378                       dispatched.remove(node);
  379                   }
  380                   if (!callDispatchMatched) {
  381                       throw new JMSException(
  382                               "Could not correlate acknowledgment with dispatched message: "
  383                                       + ack);
  384                   }
  385               }
  386           }
  387           if (callDispatchMatched && destination != null) {    
  388               destination.wakeup();
  389               dispatchPending();
  390           } else {
  391               if (isSlave()) {
  392                   throw new JMSException(
  393                           "Slave broker out of sync with master: Acknowledgment ("
  394                                   + ack + ") was not in the dispatch list: "
  395                                   + dispatched);
  396               } else {
  397                   LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
  398                           + ack);
  399               }
  400           }
  401       }
  402   
  403       /**
  404        * Checks an ack versus the contents of the dispatched list.
  405        * 
  406        * @param ack
  407        * @param firstAckedMsg
  408        * @param lastAckedMsg
  409        * @throws JMSException if it does not match
  410        */
  411   	protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
  412           MessageId firstAckedMsg = ack.getFirstMessageId();
  413           MessageId lastAckedMsg = ack.getLastMessageId();
  414           int checkCount = 0;
  415           boolean checkFoundStart = false;
  416           boolean checkFoundEnd = false;
  417           for (MessageReference node : dispatched) {
  418   
  419               if (firstAckedMsg == null) {
  420                   checkFoundStart = true;
  421               } else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
  422                   checkFoundStart = true;
  423               }
  424   
  425               if (checkFoundStart) {
  426                   checkCount++;
  427               }
  428   
  429               if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
  430                   checkFoundEnd = true;
  431                   break;
  432               }
  433           }
  434           if (!checkFoundStart && firstAckedMsg != null)
  435               throw new JMSException("Unmatched acknowledge: " + ack
  436                       + "; Could not find Message-ID " + firstAckedMsg
  437                       + " in dispatched-list (start of ack)");
  438           if (!checkFoundEnd && lastAckedMsg != null)
  439               throw new JMSException("Unmatched acknowledge: " + ack
  440                       + "; Could not find Message-ID " + lastAckedMsg
  441                       + " in dispatched-list (end of ack)");
  442           if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
  443               throw new JMSException("Unmatched acknowledge: " + ack
  444                       + "; Expected message count (" + ack.getMessageCount()
  445                       + ") differs from count in dispatched-list (" + checkCount
  446                       + ")");
  447           }
  448       }
  449   
  450       /**
  451        * @param context
  452        * @param node
  453        * @throws IOException
  454        * @throws Exception
  455        */
  456       protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
  457           broker.sendToDeadLetterQueue(context, node);
  458       }
  459       
  460       public int getInFlightSize() {
  461           return dispatched.size();
  462       }
  463       
  464       /**
  465        * Used to determine if the broker can dispatch to the consumer.
  466        * 
  467        * @return
  468        */
  469       public boolean isFull() {
  470           return dispatched.size() - prefetchExtension >= info.getPrefetchSize();
  471       }
  472   
  473       /**
  474        * @return true when 60% or more room is left for dispatching messages
  475        */
  476       public boolean isLowWaterMark() {
  477           return (dispatched.size() - prefetchExtension) <= (info.getPrefetchSize() * .4);
  478       }
  479   
  480       /**
  481        * @return true when 10% or less room is left for dispatching messages
  482        */
  483       public boolean isHighWaterMark() {
  484           return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9);
  485       }
  486   
  487       public int countBeforeFull() {
  488           return info.getPrefetchSize() + prefetchExtension - dispatched.size();
  489       }
  490   
  491       public int getPendingQueueSize() {
  492           return pending.size();
  493       }
  494   
  495       public int getDispatchedQueueSize() {
  496           return dispatched.size();
  497       }
  498   
  499       public long getDequeueCounter() {
  500           return dequeueCounter;
  501       }
  502   
  503       public long getDispatchedCounter() {
  504           return dispatchCounter;
  505       }
  506   
  507       public long getEnqueueCounter() {
  508           return enqueueCounter;
  509       }
  510   
  511       public boolean isRecoveryRequired() {
  512           return pending.isRecoveryRequired();
  513       }
  514   
  515       public PendingMessageCursor getPending() {
  516           return this.pending;
  517       }
  518   
  519       public void setPending(PendingMessageCursor pending) {
  520           this.pending = pending;
  521           if (this.pending!=null) {
  522               this.pending.setSystemUsage(usageManager);
  523               this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
  524           }
  525       }
  526   
  527      public void add(ConnectionContext context, Destination destination) throws Exception {
  528           synchronized(pendingLock) {
  529               super.add(context, destination);
  530               pending.add(context, destination);
  531           }
  532       }
  533   
  534       public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
  535           List<MessageReference> rc = new ArrayList<MessageReference>();
  536           synchronized(pendingLock) {
  537               super.remove(context, destination);
  538               // Here is a potential problem concerning Inflight stat:
  539               // Messages not already committed or rolled back may not be removed from dispatched list at the moment
  540               // Except if each commit or rollback callback action comes before remove of subscriber.
  541               rc.addAll(pending.remove(context, destination));
  542   
  543               // Synchronized to DispatchLock
  544               synchronized(dispatchLock) {
  545   	            for (MessageReference r : dispatched) {
  546   	                if( r.getRegionDestination() == destination) {
  547   	                	rc.add((QueueMessageReference)r);
  548   	                }
  549   	            }
  550                   destination.getDestinationStatistics().getDispatched().subtract(dispatched.size());
  551                   destination.getDestinationStatistics().getInflight().subtract(dispatched.size());
  552                   dispatched.clear();
  553               }            
  554           }
  555           return rc;
  556       }
  557   
  558       protected void dispatchPending() throws IOException {
  559           if (!isSlave()) {
  560              synchronized(pendingLock) {
  561                   try {
  562                       int numberToDispatch = countBeforeFull();
  563                       if (numberToDispatch > 0) {
  564                           slowConsumer=false;
  565                           pending.setMaxBatchSize(numberToDispatch);
  566                           int count = 0;
  567                           pending.reset();
  568                           while (pending.hasNext() && !isFull()
  569                                   && count < numberToDispatch) {
  570                               MessageReference node = pending.next();
  571                               if (node == null) {
  572                                   break;
  573                               }
  574                               
  575                               // Synchronize between dispatched list and remove of message from pending list
  576                               // related to remove subscription action
  577                               synchronized(dispatchLock) {
  578                                   pending.remove();
  579                                   node.decrementReferenceCount();
  580                                   if( !isDropped(node) && canDispatch(node)) {
  581   
  582                                       // Message may have been sitting in the pending
  583                                       // list a while waiting for the consumer to ak the message.
  584                                       if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
  585                                           //increment number to dispatch
  586                                           numberToDispatch++;
  587                                           if (broker.isExpired(node)) {
  588                                               node.getRegionDestination().messageExpired(context, this, node);
  589                                           }
  590                                           continue;
  591                                       }
  592                                       dispatch(node);
  593                                       count++;
  594                                   }
  595                               }
  596                           }
  597                       }else {
  598                           if (!slowConsumer) {
  599                               slowConsumer=true;
  600                               ConnectionContext c = new ConnectionContext();
  601                               c.setBroker(context.getBroker());
  602                               for (Destination dest :destinations) {
  603                                   dest.slowConsumer(c,this);
  604                               }
  605                               
  606                           }
  607                       }
  608                   } finally {
  609                       pending.release();
  610                   }
  611               }
  612           }
  613       }
  614   
  615       protected boolean dispatch(final MessageReference node) throws IOException {
  616           final Message message = node.getMessage();
  617           if (message == null) {
  618               return false;
  619           }
  620           
  621           okForAckAsDispatchDone.countDown();
  622           
  623           // No reentrant lock - Patch needed to IndirectMessageReference on method lock
  624           if (!isSlave()) {
  625   
  626               MessageDispatch md = createMessageDispatch(node, message);
  627               // NULL messages don't count... they don't get Acked.
  628               if (node != QueueMessageReference.NULL_MESSAGE) {
  629                   dispatchCounter++;
  630                   dispatched.add(node);
  631               } else {
  632                   prefetchExtension = Math.max(0, prefetchExtension - 1);
  633               }
  634               if (info.isDispatchAsync()) {
  635                   md.setTransmitCallback(new Runnable() {
  636   
  637                       public void run() {
  638                           // Since the message gets queued up in async dispatch,
  639                           // we don't want to
  640                           // decrease the reference count until it gets put on the
  641                           // wire.
  642                           onDispatch(node, message);
  643                       }
  644                   });
  645                   context.getConnection().dispatchAsync(md);
  646               } else {
  647                   context.getConnection().dispatchSync(md);
  648                   onDispatch(node, message);
  649               }
  650               return true;
  651           } else {
  652               return false;
  653           }
  654       }
  655   
  656       protected void onDispatch(final MessageReference node, final Message message) {
  657           if (node.getRegionDestination() != null) {
  658               if (node != QueueMessageReference.NULL_MESSAGE) {
  659                   node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
  660                   node.getRegionDestination().getDestinationStatistics().getInflight().increment();   
  661                   if (LOG.isTraceEnabled()) {
  662                       LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() 
  663                               + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
  664                   }
  665               }
  666           }
  667           
  668           if (info.isDispatchAsync()) {
  669               try {
  670                   dispatchPending();
  671               } catch (IOException e) {
  672                   context.getConnection().serviceExceptionAsync(e);
  673               }
  674           }
  675       }
  676   
  677       /**
  678        * inform the MessageConsumer on the client to change it's prefetch
  679        * 
  680        * @param newPrefetch
  681        */
  682       public void updateConsumerPrefetch(int newPrefetch) {
  683           if (context != null && context.getConnection() != null && context.getConnection().isManageable()) {
  684               ConsumerControl cc = new ConsumerControl();
  685               cc.setConsumerId(info.getConsumerId());
  686               cc.setPrefetch(newPrefetch);
  687               context.getConnection().dispatchAsync(cc);
  688           }
  689       }
  690   
  691       /**
  692        * @param node
  693        * @param message
  694        * @return MessageDispatch
  695        */
  696       protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
  697           if (node == QueueMessageReference.NULL_MESSAGE) {
  698               MessageDispatch md = new MessageDispatch();
  699               md.setMessage(null);
  700               md.setConsumerId(info.getConsumerId());
  701               md.setDestination(null);
  702               return md;
  703           } else {
  704               MessageDispatch md = new MessageDispatch();
  705               md.setConsumerId(info.getConsumerId());
  706               md.setDestination(node.getRegionDestination().getActiveMQDestination());
  707               md.setMessage(message);
  708               md.setRedeliveryCounter(node.getRedeliveryCounter());
  709               return md;
  710           }
  711       }
  712   
  713       /**
  714        * Use when a matched message is about to be dispatched to the client.
  715        * 
  716        * @param node
  717        * @return false if the message should not be dispatched to the client
  718        *         (another sub may have already dispatched it for example).
  719        * @throws IOException
  720        */
  721       protected abstract boolean canDispatch(MessageReference node) throws IOException;
  722       
  723       protected abstract boolean isDropped(MessageReference node);
  724   
  725       /**
  726        * Used during acknowledgment to remove the message.
  727        * 
  728        * @throws IOException
  729        */
  730       protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
  731   
  732       
  733       public int getMaxProducersToAudit() {
  734           return maxProducersToAudit;
  735       }
  736   
  737       public void setMaxProducersToAudit(int maxProducersToAudit) {
  738           this.maxProducersToAudit = maxProducersToAudit;
  739       }
  740   
  741       public int getMaxAuditDepth() {
  742           return maxAuditDepth;
  743       }
  744   
  745       public void setMaxAuditDepth(int maxAuditDepth) {
  746           this.maxAuditDepth = maxAuditDepth;
  747       }
  748   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]