Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region;
   18   
   19   import java.io.IOException;
   20   import java.util.List;
   21   
   22   import javax.jms.JMSException;
   23   
   24   import org.apache.activemq.broker.region.group.MessageGroupMap;
   25   import org.apache.activemq.broker.region.policy.SimpleDispatchSelector;
   26   import org.apache.activemq.command.ActiveMQDestination;
   27   import org.apache.activemq.command.ActiveMQMessage;
   28   import org.apache.activemq.command.ConsumerId;
   29   import org.apache.activemq.command.Message;
   30   import org.apache.commons.logging.Log;
   31   import org.apache.commons.logging.LogFactory;
   32   
   33   /**
   34    * Queue dispatch policy that determines if a message can be sent to a subscription
   35    * 
   36    * @org.apache.xbean.XBean
   37    * @version $Revision$
   38    */
   39   public class QueueDispatchSelector extends SimpleDispatchSelector {
   40       private static final Log LOG = LogFactory.getLog(QueueDispatchSelector.class);
   41       private Subscription exclusiveConsumer;
   42      
   43      
   44       /**
   45        * @param destination
   46        */
   47       public QueueDispatchSelector(ActiveMQDestination destination) {
   48           super(destination);
   49       }
   50       
   51       public Subscription getExclusiveConsumer() {
   52           return exclusiveConsumer;
   53       }
   54       public void setExclusiveConsumer(Subscription exclusiveConsumer) {
   55           this.exclusiveConsumer = exclusiveConsumer;
   56       }
   57       
   58       public boolean isExclusiveConsumer(Subscription s) {
   59           return s == this.exclusiveConsumer;
   60       }
   61       
   62          
   63       public boolean canSelect(Subscription subscription,
   64               MessageReference m) throws Exception {
   65          
   66           boolean result =  super.canDispatch(subscription, m);
   67           if (result && !subscription.isBrowser()) {
   68               result = exclusiveConsumer == null
   69                       || exclusiveConsumer == subscription;
   70               if (result) {
   71                   QueueMessageReference node = (QueueMessageReference) m;
   72                   // Keep message groups together.
   73                   String groupId = node.getGroupID();
   74                   int sequence = node.getGroupSequence();
   75                   if (groupId != null) {
   76                       MessageGroupMap messageGroupOwners = ((Queue) node
   77                               .getRegionDestination()).getMessageGroupOwners();
   78   
   79                       // If we can own the first, then no-one else should own the
   80                       // rest.
   81                       if (sequence == 1) {
   82                           assignGroup(subscription, messageGroupOwners, node,groupId);
   83                       }else {
   84       
   85                           // Make sure that the previous owner is still valid, we may
   86                           // need to become the new owner.
   87                           ConsumerId groupOwner;
   88       
   89                           groupOwner = messageGroupOwners.get(groupId);
   90                           if (groupOwner == null) {
   91                               assignGroup(subscription, messageGroupOwners, node,groupId);
   92                           } else {
   93                               if (groupOwner.equals(subscription.getConsumerInfo().getConsumerId())) {
   94                                   // A group sequence < 1 is an end of group signal.
   95                                   if (sequence < 0) {
   96                                       messageGroupOwners.removeGroup(groupId);
   97                                   }
   98                               } else {
   99                                   result = false;
  100                               }
  101                           }
  102                       }
  103                   }
  104               }
  105           }
  106           return result;
  107       }
  108       
  109       protected void assignGroup(Subscription subs,MessageGroupMap messageGroupOwners, MessageReference n, String groupId) throws IOException {
  110           messageGroupOwners.put(groupId, subs.getConsumerInfo().getConsumerId());
  111           Message message = n.getMessage();
  112           if (message instanceof ActiveMQMessage) {
  113               ActiveMQMessage activeMessage = (ActiveMQMessage)message;
  114               try {
  115                   activeMessage.setBooleanProperty("JMSXGroupFirstForConsumer", true, false);
  116               } catch (JMSException e) {
  117                   LOG.warn("Failed to set boolean header: " + e, e);
  118               }
  119           }
  120       }
  121       
  122       
  123       
  124       
  125   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]