Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region;
   18   
   19   import java.util.HashSet;
   20   import java.util.Iterator;
   21   import java.util.List;
   22   import java.util.Set;
   23   import java.util.concurrent.ConcurrentHashMap;
   24   
   25   import javax.jms.InvalidDestinationException;
   26   import javax.jms.JMSException;
   27   
   28   import org.apache.activemq.advisory.AdvisorySupport;
   29   import org.apache.activemq.broker.ConnectionContext;
   30   import org.apache.activemq.broker.region.policy.PolicyEntry;
   31   import org.apache.activemq.command.ActiveMQDestination;
   32   import org.apache.activemq.command.ConnectionId;
   33   import org.apache.activemq.command.ConsumerId;
   34   import org.apache.activemq.command.ConsumerInfo;
   35   import org.apache.activemq.command.RemoveSubscriptionInfo;
   36   import org.apache.activemq.command.SessionId;
   37   import org.apache.activemq.command.SubscriptionInfo;
   38   import org.apache.activemq.store.TopicMessageStore;
   39   import org.apache.activemq.thread.TaskRunnerFactory;
   40   import org.apache.activemq.usage.SystemUsage;
   41   import org.apache.activemq.util.LongSequenceGenerator;
   42   import org.apache.activemq.util.SubscriptionKey;
   43   import org.apache.commons.logging.Log;
   44   import org.apache.commons.logging.LogFactory;
   45   
   46   /**
   47    * @version $Revision: 1.12 $
   48    */
   49   public class TopicRegion extends AbstractRegion {
   50       private static final Log LOG = LogFactory.getLog(TopicRegion.class);
   51       protected final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
   52       private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
   53       private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
   54       private boolean keepDurableSubsActive;
   55   
   56       public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
   57                          DestinationFactory destinationFactory) {
   58           super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
   59   
   60       }
   61   
   62       public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
   63           if (info.isDurable()) {
   64               ActiveMQDestination destination = info.getDestination();
   65               if (!destination.isPattern()) {
   66                   // Make sure the destination is created.
   67                   lookup(context, destination);
   68               }
   69               String clientId = context.getClientId();
   70               String subscriptionName = info.getSubscriptionName();
   71               SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
   72               DurableTopicSubscription sub = durableSubscriptions.get(key);
   73               if (sub != null) {
   74                   if (sub.isActive()) {
   75                       throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subscriptionName);
   76                   }
   77                   // Has the selector changed??
   78                   if (hasDurableSubChanged(info, sub.getConsumerInfo())) {
   79                       // Remove the consumer first then add it.
   80                       durableSubscriptions.remove(key);
   81                       synchronized (destinationsMutex) {
   82                           for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
   83                               Destination dest = iter.next();
   84                               //Account for virtual destinations
   85                               if (dest instanceof Topic){
   86                                   Topic topic = (Topic)dest;
   87                                   topic.deleteSubscription(context, key);
   88                               }
   89                           }
   90                       }
   91                       super.removeConsumer(context, sub.getConsumerInfo());
   92                       super.addConsumer(context, info);
   93                       sub = durableSubscriptions.get(key);
   94                   } else {
   95                       // Change the consumer id key of the durable sub.
   96                       if (sub.getConsumerInfo().getConsumerId() != null) {
   97                           subscriptions.remove(sub.getConsumerInfo().getConsumerId());
   98                       }
   99                       subscriptions.put(info.getConsumerId(), sub);
  100                   }
  101               } else {
  102                   super.addConsumer(context, info);
  103                   sub = durableSubscriptions.get(key);
  104                   if (sub == null) {
  105                       throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " + key.getClientId()
  106                                              + " subscriberName: " + key.getSubscriptionName());
  107                   }
  108               }
  109               sub.activate(usageManager, context, info);
  110               return sub;
  111           } else {
  112               return super.addConsumer(context, info);
  113           }
  114       }
  115   
  116       public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
  117           if (info.isDurable()) {
  118   
  119               SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
  120               DurableTopicSubscription sub = durableSubscriptions.get(key);
  121               if (sub != null) {
  122                   sub.deactivate(keepDurableSubsActive);
  123               }
  124   
  125           } else {
  126               super.removeConsumer(context, info);
  127           }
  128       }
  129   
  130       public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
  131           SubscriptionKey key = new SubscriptionKey(info.getClientId(), info.getSubscriptionName());
  132           DurableTopicSubscription sub = durableSubscriptions.get(key);
  133           if (sub == null) {
  134               throw new InvalidDestinationException("No durable subscription exists for: " + info.getSubscriptionName());
  135           }
  136           if (sub.isActive()) {
  137               throw new JMSException("Durable consumer is in use");
  138           }
  139   
  140           durableSubscriptions.remove(key);
  141           synchronized (destinationsMutex) {
  142               for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
  143               	Destination dest = iter.next();
  144               	//Account for virtual destinations
  145               	if (dest instanceof Topic){
  146               	    Topic topic = (Topic)dest;
  147               	    topic.deleteSubscription(context, key);
  148               	}
  149               }
  150           }
  151           super.removeConsumer(context, sub.getConsumerInfo());
  152       }
  153   
  154       public String toString() {
  155           return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
  156       }
  157   
  158       @Override
  159       protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
  160   
  161           List<Subscription> rc = super.addSubscriptionsForDestination(context, dest);
  162           Set<Subscription> dupChecker = new HashSet<Subscription>(rc);
  163   
  164           TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
  165           // Eagerly recover the durable subscriptions
  166           if (store != null) {
  167               SubscriptionInfo[] infos = store.getAllSubscriptions();
  168               for (int i = 0; i < infos.length; i++) {
  169   
  170                   SubscriptionInfo info = infos[i];
  171                   LOG.debug("Restoring durable subscription: " + infos);
  172                   SubscriptionKey key = new SubscriptionKey(info);
  173   
  174                   // A single durable sub may be subscribing to multiple topics.
  175                   // so it might exist already.
  176                   DurableTopicSubscription sub = durableSubscriptions.get(key);
  177                   ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
  178                   if (sub == null) {
  179                       ConnectionContext c = new ConnectionContext();
  180                       c.setBroker(context.getBroker());
  181                       c.setClientId(key.getClientId());
  182                       c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
  183                       sub = (DurableTopicSubscription)createSubscription(c, consumerInfo);
  184                   }
  185   
  186                   if (dupChecker.contains(sub)) {
  187                       continue;
  188                   }
  189   
  190                   dupChecker.add(sub);
  191                   rc.add(sub);
  192                   dest.addSubscription(context, sub);
  193               }
  194   
  195               // Now perhaps there other durable subscriptions (via wild card)
  196               // that would match this destination..
  197               durableSubscriptions.values();
  198               for (Iterator<DurableTopicSubscription> iterator = durableSubscriptions.values().iterator(); iterator.hasNext();) {
  199                   DurableTopicSubscription sub = iterator.next();
  200                   // Skip over subscriptions that we allready added..
  201                   if (dupChecker.contains(sub)) {
  202                       continue;
  203                   }
  204   
  205                   if (sub.matches(dest.getActiveMQDestination())) {
  206                       rc.add(sub);
  207                       dest.addSubscription(context, sub);
  208                   }
  209               }
  210           }
  211   
  212           return rc;
  213       }
  214   
  215       private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
  216           ConsumerInfo rc = new ConsumerInfo();
  217           rc.setSelector(info.getSelector());
  218           rc.setSubscriptionName(info.getSubscriptionName());
  219           rc.setDestination(info.getSubscribedDestination());
  220           rc.setConsumerId(createConsumerId());
  221           return rc;
  222       }
  223   
  224       private ConsumerId createConsumerId() {
  225           return new ConsumerId(recoveredDurableSubSessionId, recoveredDurableSubIdGenerator.getNextSequenceId());
  226       }
  227   
  228       protected void configureTopic(Topic topic, ActiveMQDestination destination) {
  229           if (broker.getDestinationPolicy() != null) {
  230               PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
  231               if (entry != null) {
  232                   entry.configure(topic);
  233               }
  234           }
  235       }
  236   
  237       protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
  238           ActiveMQDestination destination = info.getDestination();
  239           
  240           if (info.isDurable()) {
  241               if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
  242                   throw new JMSException("Cannot create a durable subscription for an advisory Topic");
  243               }
  244               SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
  245               DurableTopicSubscription sub = durableSubscriptions.get(key);
  246               
  247               if (sub == null) {
  248                   
  249                   sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
  250                   if (destination != null && broker.getDestinationPolicy() != null) {
  251                       PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
  252                       if (entry != null) {
  253                           entry.configure(broker, usageManager, sub);
  254                       }
  255                   }
  256                   durableSubscriptions.put(key, sub);
  257               } else {
  258                   throw new JMSException("That durable subscription is already active.");
  259               }
  260               return sub;
  261           }
  262           try {
  263               TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
  264               // lets configure the subscription depending on the destination
  265               if (destination != null && broker.getDestinationPolicy() != null) {
  266                   PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
  267                   if (entry != null) {
  268                       entry.configure(broker, usageManager, answer);
  269                   }
  270               }
  271               answer.init();
  272               return answer;
  273           } catch (Exception e) {
  274               LOG.error("Failed to create TopicSubscription ", e);
  275               JMSException jmsEx = new JMSException("Couldn't create TopicSubscription");
  276               jmsEx.setLinkedException(e);
  277               throw jmsEx;
  278           }
  279       }
  280   
  281       /**
  282        */
  283       private boolean hasDurableSubChanged(ConsumerInfo info1, ConsumerInfo info2) {
  284           if (info1.getSelector() != null ^ info2.getSelector() != null) {
  285               return true;
  286           }
  287           if (info1.getSelector() != null && !info1.getSelector().equals(info2.getSelector())) {
  288               return true;
  289           }
  290           return !info1.getDestination().equals(info2.getDestination());
  291       }
  292   
  293       protected Set<ActiveMQDestination> getInactiveDestinations() {
  294           Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
  295           for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
  296               ActiveMQDestination dest = iter.next();
  297               if (!dest.isTopic()) {
  298                   iter.remove();
  299               }
  300           }
  301           return inactiveDestinations;
  302       }
  303   
  304       public boolean isKeepDurableSubsActive() {
  305           return keepDurableSubsActive;
  306       }
  307   
  308       public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
  309           this.keepDurableSubsActive = keepDurableSubsActive;
  310       }
  311   
  312   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]