Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region;
   18   
   19   import java.util.ArrayList;
   20   import java.util.HashMap;
   21   import java.util.Iterator;
   22   import java.util.List;
   23   import java.util.Map;
   24   import java.util.Set;
   25   import java.util.concurrent.ConcurrentHashMap;
   26   import javax.jms.JMSException;
   27   import org.apache.activemq.broker.ConnectionContext;
   28   import org.apache.activemq.broker.ConsumerBrokerExchange;
   29   import org.apache.activemq.broker.DestinationAlreadyExistsException;
   30   import org.apache.activemq.broker.ProducerBrokerExchange;
   31   import org.apache.activemq.command.ActiveMQDestination;
   32   import org.apache.activemq.command.ConsumerId;
   33   import org.apache.activemq.command.ConsumerInfo;
   34   import org.apache.activemq.command.Message;
   35   import org.apache.activemq.command.MessageAck;
   36   import org.apache.activemq.command.MessageDispatchNotification;
   37   import org.apache.activemq.command.MessagePull;
   38   import org.apache.activemq.command.ProducerInfo;
   39   import org.apache.activemq.command.RemoveSubscriptionInfo;
   40   import org.apache.activemq.command.Response;
   41   import org.apache.activemq.filter.DestinationFilter;
   42   import org.apache.activemq.filter.DestinationMap;
   43   import org.apache.activemq.security.SecurityContext;
   44   import org.apache.activemq.thread.TaskRunnerFactory;
   45   import org.apache.activemq.usage.SystemUsage;
   46   import org.apache.commons.logging.Log;
   47   import org.apache.commons.logging.LogFactory;
   48   
   49   /**
   50    * @version $Revision: 1.14 $
   51    */
   52   public abstract class AbstractRegion implements Region {
   53   
   54       private static final Log LOG = LogFactory.getLog(AbstractRegion.class);
   55   
   56       protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
   57       protected final DestinationMap destinationMap = new DestinationMap();
   58       protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
   59       protected final SystemUsage usageManager;
   60       protected final DestinationFactory destinationFactory;
   61       protected final DestinationStatistics destinationStatistics;
   62       protected final RegionBroker broker;
   63       protected boolean autoCreateDestinations = true;
   64       protected final TaskRunnerFactory taskRunnerFactory;
   65       protected final Object destinationsMutex = new Object();
   66       protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
   67       protected boolean started;
   68   
   69       public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
   70                             DestinationFactory destinationFactory) {
   71           if (broker == null) {
   72               throw new IllegalArgumentException("null broker");
   73           }
   74           this.broker = broker;
   75           this.destinationStatistics = destinationStatistics;
   76           this.usageManager = memoryManager;
   77           this.taskRunnerFactory = taskRunnerFactory;
   78           if (broker == null) {
   79               throw new IllegalArgumentException("null destinationFactory");
   80           }
   81           this.destinationFactory = destinationFactory;
   82       }
   83   
   84       public final  void start() throws Exception {
   85           started = true;
   86   
   87           Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
   88           for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) {
   89               ActiveMQDestination dest = iter.next();
   90   
   91               ConnectionContext context = new ConnectionContext();
   92               context.setBroker(broker.getBrokerService().getBroker());
   93               context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
   94               context.getBroker().addDestination(context, dest);
   95           }
   96           synchronized (destinationsMutex) {
   97               for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
   98                   Destination dest = i.next();
   99                   dest.start();
  100               }
  101           }
  102       }
  103   
  104       public void stop() throws Exception {
  105           started = false;
  106           synchronized (destinationsMutex) {
  107               for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
  108                   Destination dest = i.next();
  109                   dest.stop();
  110               }
  111           }
  112           destinations.clear();
  113       }
  114   
  115       public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
  116           LOG.debug(broker.getBrokerName() + " adding destination: " + destination);
  117           synchronized (destinationsMutex) {
  118               Destination dest = destinations.get(destination);
  119               if (dest == null) {
  120                   dest = createDestination(context, destination);
  121                   // intercept if there is a valid interceptor defined
  122                   DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
  123                   if (destinationInterceptor != null) {
  124                       dest = destinationInterceptor.intercept(dest);
  125                   }
  126                   dest.start();
  127                   destinations.put(destination, dest);
  128                   destinationMap.put(destination, dest);
  129                   addSubscriptionsForDestination(context, dest);
  130               }
  131               return dest;
  132           }
  133       }
  134   
  135       public Map<ConsumerId, Subscription> getSubscriptions() {
  136           return subscriptions;
  137       }
  138       
  139       protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
  140   
  141           List<Subscription> rc = new ArrayList<Subscription>();
  142           // Add all consumers that are interested in the destination.
  143           for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
  144               Subscription sub = iter.next();
  145               if (sub.matches(dest.getActiveMQDestination())) {
  146                   dest.addSubscription(context, sub);
  147                   rc.add(sub);
  148               }
  149           }
  150           return rc;
  151   
  152       }
  153   
  154       public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
  155   
  156           // No timeout.. then try to shut down right way, fails if there are
  157           // current subscribers.
  158           if (timeout == 0) {
  159               for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
  160                   Subscription sub = iter.next();
  161                   if (sub.matches(destination)) {
  162                       throw new JMSException("Destination still has an active subscription: " + destination);
  163                   }
  164               }
  165           }
  166   
  167           if (timeout > 0) {
  168               // TODO: implement a way to notify the subscribers that we want to
  169               // take the down
  170               // the destination and that they should un-subscribe.. Then wait up
  171               // to timeout time before
  172               // dropping the subscription.
  173           }
  174   
  175           LOG.debug("Removing destination: " + destination);
  176           
  177           synchronized (destinationsMutex) {
  178               Destination dest = destinations.remove(destination);
  179               if (dest != null) {
  180                   // timeout<0 or we timed out, we now force any remaining
  181                   // subscriptions to un-subscribe.
  182                   for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
  183                       Subscription sub = iter.next();
  184                       if (sub.matches(destination)) {
  185                           dest.removeSubscription(context, sub, 0l);
  186                       }
  187                   }
  188                   destinationMap.removeAll(destination);
  189                   dispose(context,dest);
  190                   DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
  191                   if (destinationInterceptor != null) {
  192                       destinationInterceptor.remove(dest);
  193                   }
  194   
  195               } else {   
  196                   LOG.debug("Destination doesn't exist: " + dest);
  197               }
  198           }
  199       }
  200   
  201       /**
  202        * Provide an exact or wildcard lookup of destinations in the region
  203        * 
  204        * @return a set of matching destination objects.
  205        */
  206       public Set<Destination> getDestinations(ActiveMQDestination destination) {
  207           synchronized (destinationsMutex) {
  208               return destinationMap.get(destination);
  209           }
  210       }
  211   
  212       public Map<ActiveMQDestination, Destination> getDestinationMap() {
  213           synchronized (destinationsMutex) {
  214               return new HashMap<ActiveMQDestination, Destination>(destinations);
  215           }
  216       }
  217   
  218       public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
  219           LOG.debug(broker.getBrokerName() + " adding consumer: " + info.getConsumerId() + " for destination: " + info.getDestination());
  220           ActiveMQDestination destination = info.getDestination();
  221           if (destination != null && !destination.isPattern() && !destination.isComposite()) {
  222               // lets auto-create the destination
  223               lookup(context, destination);
  224           }
  225   
  226           Object addGuard;
  227           synchronized (consumerChangeMutexMap) {
  228               addGuard = consumerChangeMutexMap.get(info.getConsumerId());
  229               if (addGuard == null) {
  230                   addGuard = new Object();
  231                   consumerChangeMutexMap.put(info.getConsumerId(), addGuard);
  232               }
  233           }
  234           synchronized (addGuard) {
  235               Subscription o = subscriptions.get(info.getConsumerId());
  236               if (o != null) {
  237                   LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
  238                   return o;
  239               }
  240   
  241               // We may need to add some destinations that are in persistent store
  242               // but not active
  243               // in the broker.
  244               //
  245               // TODO: think about this a little more. This is good cause
  246               // destinations are not loaded into
  247               // memory until a client needs to use the queue, but a management
  248               // agent viewing the
  249               // broker will not see a destination that exists in persistent
  250               // store. We may want to
  251               // eagerly load all destinations into the broker but have an
  252               // inactive state for the
  253               // destination which has reduced memory usage.
  254               //
  255               DestinationFilter.parseFilter(info.getDestination());
  256   
  257               Subscription sub = createSubscription(context, info);
  258   
  259               subscriptions.put(info.getConsumerId(), sub);
  260   
  261               // At this point we're done directly manipulating subscriptions,
  262               // but we need to retain the synchronized block here. Consider
  263               // otherwise what would happen if at this point a second
  264               // thread added, then removed, as would be allowed with
  265               // no mutex held. Remove is only essentially run once
  266               // so everything after this point would be leaked.
  267   
  268               // Add the subscription to all the matching queues.
  269               // But copy the matches first - to prevent deadlocks
  270               List<Destination>addList = new ArrayList<Destination>();
  271               synchronized(destinationsMutex) {
  272                   for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
  273                       Destination dest = (Destination)iter.next();
  274                       addList.add(dest);
  275                   }
  276               }
  277               
  278               for (Destination dest:addList) {
  279                   dest.addSubscription(context, sub);
  280               }
  281   
  282               if (info.isBrowser()) {
  283                   ((QueueBrowserSubscription)sub).destinationsAdded();
  284               }
  285   
  286               return sub;
  287           }
  288       }
  289   
  290       /**
  291        * Get all the Destinations that are in storage
  292        * 
  293        * @return Set of all stored destinations
  294        */
  295       public Set getDurableDestinations() {
  296           return destinationFactory.getDestinations();
  297       }
  298   
  299       /**
  300        * @return all Destinations that don't have active consumers
  301        */
  302       protected Set<ActiveMQDestination> getInactiveDestinations() {
  303           Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
  304           synchronized (destinationsMutex) {
  305               inactiveDests.removeAll(destinations.keySet());
  306           }
  307           return inactiveDests;
  308       }
  309   
  310       public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
  311           LOG.debug(broker.getBrokerName() + " removing consumer: " + info.getConsumerId() + " for destination: " + info.getDestination());
  312   
  313           Subscription sub = subscriptions.remove(info.getConsumerId());
  314           //The sub could be removed elsewhere - see ConnectionSplitBroker
  315           if (sub != null) {
  316   
  317               // remove the subscription from all the matching queues.
  318               List<Destination> removeList = new ArrayList<Destination>();
  319               synchronized (destinationsMutex) {
  320                   for (Iterator iter = destinationMap.get(info.getDestination())
  321                           .iterator(); iter.hasNext();) {
  322                       Destination dest = (Destination) iter.next();
  323                       removeList.add(dest);
  324                       
  325                   }
  326               }
  327               for(Destination dest:removeList) {
  328                 dest.removeSubscription(context, sub, info.getLastDeliveredSequenceId());
  329               }
  330   
  331               destroySubscription(sub);
  332           }
  333           synchronized (consumerChangeMutexMap) {
  334               consumerChangeMutexMap.remove(info.getConsumerId());
  335           }
  336       }
  337   
  338       protected void destroySubscription(Subscription sub) {
  339           sub.destroy();
  340       }
  341   
  342       public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
  343           throw new JMSException("Invalid operation.");
  344       }
  345   
  346       public void send(final ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
  347           final ConnectionContext context = producerExchange.getConnectionContext();
  348   
  349           if (producerExchange.isMutable() || producerExchange.getRegionDestination() == null) {
  350               final Destination regionDestination = lookup(context, messageSend.getDestination());
  351               producerExchange.setRegionDestination(regionDestination);
  352           }
  353   
  354           producerExchange.getRegionDestination().send(producerExchange, messageSend);
  355       }
  356   
  357       public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
  358           Subscription sub = consumerExchange.getSubscription();
  359           if (sub == null) {
  360               sub = subscriptions.get(ack.getConsumerId());        
  361               if (sub == null) {
  362                   if (!consumerExchange.getConnectionContext().isInRecoveryMode()) {
  363                       LOG.warn("Ack for non existent subscription, ack:" + ack); 
  364                       throw new IllegalArgumentException(
  365                           "The subscription does not exist: "
  366                           + ack.getConsumerId());
  367                   } else {
  368                       return;
  369                   }
  370               }
  371               consumerExchange.setSubscription(sub);
  372           }
  373           sub.acknowledge(consumerExchange.getConnectionContext(), ack);
  374       }
  375   
  376       public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
  377           Subscription sub = subscriptions.get(pull.getConsumerId());
  378           if (sub == null) {
  379               throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId());
  380           }
  381           return sub.pullMessage(context, pull);
  382       }
  383   
  384       protected Destination lookup(ConnectionContext context, ActiveMQDestination destination) throws Exception {
  385           Destination dest = null;
  386           synchronized (destinationsMutex) {
  387               dest = destinations.get(destination);
  388           }
  389           if (dest == null) {
  390               if (autoCreateDestinations) {
  391                   // Try to auto create the destination... re-invoke broker
  392                   // from the
  393                   // top so that the proper security checks are performed.
  394                   try {
  395                       context.getBroker().addDestination(context, destination);
  396                       dest = addDestination(context, destination);
  397                   } catch (DestinationAlreadyExistsException e) {
  398                       // if the destination already exists then lets ignore
  399                       // this error
  400                   }
  401                   // We should now have the dest created.
  402                   synchronized (destinationsMutex) {
  403                       dest = destinations.get(destination);
  404                   }
  405               }
  406               if (dest == null) {
  407                   throw new JMSException("The destination " + destination + " does not exist.");
  408               }
  409           }
  410           return dest;
  411       }
  412   
  413       public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
  414           Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
  415           if (sub != null) {
  416               sub.processMessageDispatchNotification(messageDispatchNotification);
  417           } else {
  418               throw new JMSException("Slave broker out of sync with master - Subscription: "
  419                       + messageDispatchNotification.getConsumerId()
  420                       + " on " + messageDispatchNotification.getDestination()
  421                       + " does not exist for dispatch of message: "
  422                       + messageDispatchNotification.getMessageId());
  423           }
  424       }
  425       
  426       /*
  427        * For a Queue/TempQueue, dispatch order is imperative to match acks, so the dispatch is deferred till 
  428        * the notification to ensure that the subscription chosen by the master is used. AMQ-2102
  429        */ 
  430       protected void processDispatchNotificationViaDestination(MessageDispatchNotification messageDispatchNotification) throws Exception {
  431           Destination dest = null;
  432           synchronized (destinationsMutex) {
  433               dest = destinations.get(messageDispatchNotification.getDestination());
  434           }
  435           if (dest != null) {
  436               dest.processDispatchNotification(messageDispatchNotification);
  437           } else {
  438               throw new JMSException(
  439                       "Slave broker out of sync with master - Destination: " 
  440                               + messageDispatchNotification.getDestination()
  441                               + " does not exist for consumer "
  442                               + messageDispatchNotification.getConsumerId()
  443                               + " with message: "
  444                               + messageDispatchNotification.getMessageId());
  445           }
  446       }
  447   
  448       public void gc() {
  449           for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
  450               Subscription sub = iter.next();
  451               sub.gc();
  452           }
  453           synchronized (destinationsMutex) {
  454               for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
  455                   Destination dest = iter.next();
  456                   dest.gc();
  457               }
  458           }
  459       }
  460   
  461       protected abstract Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws Exception;
  462   
  463       protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
  464           return destinationFactory.createDestination(context, destination, destinationStatistics);
  465       }
  466   
  467       public boolean isAutoCreateDestinations() {
  468           return autoCreateDestinations;
  469       }
  470   
  471       public void setAutoCreateDestinations(boolean autoCreateDestinations) {
  472           this.autoCreateDestinations = autoCreateDestinations;
  473       }
  474       
  475       public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception{
  476           synchronized (destinationsMutex) {
  477               for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
  478                   Destination dest = (Destination) iter.next();
  479                   dest.addProducer(context, info);
  480               }
  481           }
  482       }
  483   
  484       /**
  485        * Removes a Producer.
  486        * @param context the environment the operation is being executed under.
  487        * @throws Exception TODO
  488        */
  489       public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception{
  490           synchronized (destinationsMutex) {
  491               for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
  492                   Destination dest = (Destination)iter.next();
  493                   dest.removeProducer(context, info);
  494               }
  495           }
  496       }
  497       
  498       protected void dispose(ConnectionContext context,Destination dest) throws Exception {
  499           dest.dispose(context);
  500           dest.stop();
  501           destinationFactory.removeDestination(dest);
  502       }
  503   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]