Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region;
   18   
   19   import java.io.IOException;
   20   import java.net.URI;
   21   import java.util.ArrayList;
   22   import java.util.Collections;
   23   import java.util.HashMap;
   24   import java.util.Map;
   25   import java.util.Set;
   26   import java.util.concurrent.ConcurrentHashMap;
   27   import java.util.concurrent.CopyOnWriteArrayList;
   28   
   29   import javax.jms.InvalidClientIDException;
   30   import javax.jms.JMSException;
   31   import org.apache.activemq.broker.Broker;
   32   import org.apache.activemq.broker.BrokerService;
   33   import org.apache.activemq.broker.Connection;
   34   import org.apache.activemq.broker.ConnectionContext;
   35   import org.apache.activemq.broker.ConsumerBrokerExchange;
   36   import org.apache.activemq.broker.EmptyBroker;
   37   import org.apache.activemq.broker.ProducerBrokerExchange;
   38   import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
   39   import org.apache.activemq.broker.region.policy.PolicyMap;
   40   import org.apache.activemq.command.ActiveMQDestination;
   41   import org.apache.activemq.command.BrokerId;
   42   import org.apache.activemq.command.BrokerInfo;
   43   import org.apache.activemq.command.ConnectionId;
   44   import org.apache.activemq.command.ConnectionInfo;
   45   import org.apache.activemq.command.ConsumerInfo;
   46   import org.apache.activemq.command.DestinationInfo;
   47   import org.apache.activemq.command.Message;
   48   import org.apache.activemq.command.MessageAck;
   49   import org.apache.activemq.command.MessageDispatch;
   50   import org.apache.activemq.command.MessageDispatchNotification;
   51   import org.apache.activemq.command.MessagePull;
   52   import org.apache.activemq.command.ProducerInfo;
   53   import org.apache.activemq.command.RemoveSubscriptionInfo;
   54   import org.apache.activemq.command.Response;
   55   import org.apache.activemq.command.TransactionId;
   56   import org.apache.activemq.kaha.Store;
   57   import org.apache.activemq.state.ConnectionState;
   58   import org.apache.activemq.thread.TaskRunnerFactory;
   59   import org.apache.activemq.usage.SystemUsage;
   60   import org.apache.activemq.util.BrokerSupport;
   61   import org.apache.activemq.util.IdGenerator;
   62   import org.apache.activemq.util.LongSequenceGenerator;
   63   import org.apache.activemq.util.ServiceStopper;
   64   import org.apache.commons.logging.Log;
   65   import org.apache.commons.logging.LogFactory;
   66   
   67   /**
   68    * Routes Broker operations to the correct messaging regions for processing.
   69    * 
   70    * @version $Revision$
   71    */
   72   public class RegionBroker extends EmptyBroker {
   73       public static final String ORIGINAL_EXPIRATION = "originalExpiration";
   74       private static final Log LOG = LogFactory.getLog(RegionBroker.class);
   75       private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
   76   
   77       protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
   78       protected DestinationFactory destinationFactory;
   79       protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
   80   
   81       private final Region queueRegion;
   82       private final Region topicRegion;
   83       private final Region tempQueueRegion;
   84       private final Region tempTopicRegion;
   85       protected final BrokerService brokerService;
   86       private boolean started;
   87       private boolean keepDurableSubsActive;
   88   
   89       private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
   90       private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
   91       private final CopyOnWriteArrayList<BrokerInfo> brokerInfos = new CopyOnWriteArrayList<BrokerInfo>();
   92   
   93       private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
   94       private BrokerId brokerId;
   95       private String brokerName;
   96       private final Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
   97       private final DestinationInterceptor destinationInterceptor;
   98       private ConnectionContext adminConnectionContext;
   99   
  100       public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
  101                           DestinationInterceptor destinationInterceptor) throws IOException {
  102           this.brokerService = brokerService;
  103           if (destinationFactory == null) {
  104               throw new IllegalArgumentException("null destinationFactory");
  105           }
  106           this.sequenceGenerator.setLastSequenceId(destinationFactory.getLastMessageBrokerSequenceId());
  107           this.destinationFactory = destinationFactory;
  108           queueRegion = createQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
  109           topicRegion = createTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
  110           this.destinationInterceptor = destinationInterceptor;
  111           tempQueueRegion = createTempQueueRegion(memoryManager, taskRunnerFactory, destinationFactory);
  112           tempTopicRegion = createTempTopicRegion(memoryManager, taskRunnerFactory, destinationFactory);
  113       }
  114   
  115       public Map<ActiveMQDestination, Destination> getDestinationMap() {
  116           Map<ActiveMQDestination, Destination> answer = getQueueRegion().getDestinationMap();
  117           answer.putAll(getTopicRegion().getDestinationMap());
  118           return answer;
  119       }
  120   
  121       public Set <Destination> getDestinations(ActiveMQDestination destination) {
  122           switch (destination.getDestinationType()) {
  123           case ActiveMQDestination.QUEUE_TYPE:
  124               return queueRegion.getDestinations(destination);
  125           case ActiveMQDestination.TOPIC_TYPE:
  126               return topicRegion.getDestinations(destination);
  127           case ActiveMQDestination.TEMP_QUEUE_TYPE:
  128               return tempQueueRegion.getDestinations(destination);
  129           case ActiveMQDestination.TEMP_TOPIC_TYPE:
  130               return tempTopicRegion.getDestinations(destination);
  131           default:
  132               return Collections.emptySet();
  133           }
  134       }
  135   
  136       public Broker getAdaptor(Class type) {
  137           if (type.isInstance(this)) {
  138               return this;
  139           }
  140           return null;
  141       }
  142   
  143       public Region getQueueRegion() {
  144           return queueRegion;
  145       }
  146   
  147       public Region getTempQueueRegion() {
  148           return tempQueueRegion;
  149       }
  150   
  151       public Region getTempTopicRegion() {
  152           return tempTopicRegion;
  153       }
  154   
  155       public Region getTopicRegion() {
  156           return topicRegion;
  157       }
  158   
  159       protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
  160           return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
  161       }
  162   
  163       protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
  164           return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
  165       }
  166   
  167       protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
  168           return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
  169       }
  170   
  171       protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
  172           return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
  173       }
  174   
  175       public void start() throws Exception {
  176           ((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
  177           started = true;
  178           queueRegion.start();
  179           topicRegion.start();
  180           tempQueueRegion.start();
  181           tempTopicRegion.start();
  182       }
  183   
  184       public void stop() throws Exception {
  185           started = false;
  186           ServiceStopper ss = new ServiceStopper();
  187           doStop(ss);
  188           ss.throwFirstException();
  189           // clear the state
  190           clientIdSet.clear();
  191           connections.clear();
  192           destinations.clear();
  193           brokerInfos.clear();
  194       }
  195   
  196       public PolicyMap getDestinationPolicy() {
  197           return brokerService != null ? brokerService.getDestinationPolicy() : null;
  198       }
  199   
  200       public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
  201           String clientId = info.getClientId();
  202           if (clientId == null) {
  203               throw new InvalidClientIDException("No clientID specified for connection request");
  204           }
  205           synchronized (clientIdSet) {
  206               ConnectionContext oldContext = clientIdSet.get(clientId);
  207               if (oldContext != null) {
  208               	if (context.isFaultTolerant() || context.isNetworkConnection()){
  209               		//remove the old connection
  210               		try{
  211               			removeConnection(oldContext, info, new Exception("remove stale client"));
  212               		}catch(Exception e){
  213               			LOG.warn("Failed to remove stale connection ",e);
  214               		}
  215               	}else{
  216                   throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
  217                                                      + oldContext.getConnection().getRemoteAddress());
  218               	}
  219               } else {
  220                   clientIdSet.put(clientId, context);
  221               }
  222           }
  223   
  224           connections.add(context.getConnection());
  225       }
  226   
  227       public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
  228           String clientId = info.getClientId();
  229           if (clientId == null) {
  230               throw new InvalidClientIDException("No clientID specified for connection disconnect request");
  231           }
  232           synchronized (clientIdSet) {
  233               ConnectionContext oldValue = clientIdSet.get(clientId);
  234               // we may be removing the duplicate connection, not the first
  235               // connection to be created
  236               // so lets check that their connection IDs are the same
  237               if (oldValue == context) {
  238                   if (isEqual(oldValue.getConnectionId(), info.getConnectionId())) {
  239                       clientIdSet.remove(clientId);
  240                   }
  241               }
  242           }
  243           connections.remove(context.getConnection());
  244       }
  245   
  246       protected boolean isEqual(ConnectionId connectionId, ConnectionId connectionId2) {
  247           return connectionId == connectionId2 || (connectionId != null && connectionId.equals(connectionId2));
  248       }
  249   
  250       public Connection[] getClients() throws Exception {
  251           ArrayList<Connection> l = new ArrayList<Connection>(connections);
  252           Connection rc[] = new Connection[l.size()];
  253           l.toArray(rc);
  254           return rc;
  255       }
  256   
  257       public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
  258   
  259           Destination answer;
  260   
  261           answer = destinations.get(destination);
  262           if (answer != null) {
  263               return answer;
  264           }
  265   
  266           switch (destination.getDestinationType()) {
  267           case ActiveMQDestination.QUEUE_TYPE:
  268               answer = queueRegion.addDestination(context, destination);
  269               break;
  270           case ActiveMQDestination.TOPIC_TYPE:
  271               answer = topicRegion.addDestination(context, destination);
  272               break;
  273           case ActiveMQDestination.TEMP_QUEUE_TYPE:
  274               answer = tempQueueRegion.addDestination(context, destination);
  275               break;
  276           case ActiveMQDestination.TEMP_TOPIC_TYPE:
  277               answer = tempTopicRegion.addDestination(context, destination);
  278               break;
  279           default:
  280               throw createUnknownDestinationTypeException(destination);
  281           }
  282   
  283           destinations.put(destination, answer);
  284           return answer;
  285   
  286       }
  287   
  288       public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
  289   
  290           if (destinations.containsKey(destination)) {
  291               switch (destination.getDestinationType()) {
  292               case ActiveMQDestination.QUEUE_TYPE:
  293                   queueRegion.removeDestination(context, destination, timeout);
  294                   break;
  295               case ActiveMQDestination.TOPIC_TYPE:
  296                   topicRegion.removeDestination(context, destination, timeout);
  297                   break;
  298               case ActiveMQDestination.TEMP_QUEUE_TYPE:
  299                   tempQueueRegion.removeDestination(context, destination, timeout);
  300                   break;
  301               case ActiveMQDestination.TEMP_TOPIC_TYPE:
  302                   tempTopicRegion.removeDestination(context, destination, timeout);
  303                   break;
  304               default:
  305                   throw createUnknownDestinationTypeException(destination);
  306               }
  307               destinations.remove(destination);
  308           }
  309   
  310       }
  311   
  312       public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
  313           addDestination(context, info.getDestination());
  314   
  315       }
  316   
  317       public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
  318           removeDestination(context, info.getDestination(), info.getTimeout());
  319   
  320       }
  321   
  322       public ActiveMQDestination[] getDestinations() throws Exception {
  323           ArrayList<ActiveMQDestination> l;
  324   
  325           l = new ArrayList<ActiveMQDestination>(getDestinationMap().keySet());
  326   
  327           ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
  328           l.toArray(rc);
  329           return rc;
  330       }
  331   
  332       public void addProducer(ConnectionContext context, ProducerInfo info)
  333               throws Exception {
  334           ActiveMQDestination destination = info.getDestination();
  335           if (destination != null) {
  336   
  337               // This seems to cause the destination to be added but without advisories firing...
  338               context.getBroker().addDestination(context, destination);
  339               switch (destination.getDestinationType()) {
  340               case ActiveMQDestination.QUEUE_TYPE:
  341                   queueRegion.addProducer(context, info);
  342                   break;
  343               case ActiveMQDestination.TOPIC_TYPE:
  344                   topicRegion.addProducer(context, info);
  345                   break;
  346               case ActiveMQDestination.TEMP_QUEUE_TYPE:
  347                   tempQueueRegion.addProducer(context, info);
  348                   break;
  349               case ActiveMQDestination.TEMP_TOPIC_TYPE:
  350                   tempTopicRegion.addProducer(context, info);
  351                   break;
  352               }
  353           }
  354       }
  355   
  356       public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
  357           ActiveMQDestination destination = info.getDestination();
  358           if (destination != null) {
  359               switch (destination.getDestinationType()) {
  360               case ActiveMQDestination.QUEUE_TYPE:
  361                   queueRegion.removeProducer(context, info);
  362                   break;
  363               case ActiveMQDestination.TOPIC_TYPE:
  364                   topicRegion.removeProducer(context, info);
  365                   break;
  366               case ActiveMQDestination.TEMP_QUEUE_TYPE:
  367                   tempQueueRegion.removeProducer(context, info);
  368                   break;
  369               case ActiveMQDestination.TEMP_TOPIC_TYPE:
  370                   tempTopicRegion.removeProducer(context, info);
  371                   break;
  372               }
  373           }
  374       }
  375   
  376       public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
  377           ActiveMQDestination destination = info.getDestination();
  378           switch (destination.getDestinationType()) {
  379           case ActiveMQDestination.QUEUE_TYPE:
  380               return queueRegion.addConsumer(context, info);
  381   
  382           case ActiveMQDestination.TOPIC_TYPE:
  383               return topicRegion.addConsumer(context, info);
  384   
  385           case ActiveMQDestination.TEMP_QUEUE_TYPE:
  386               return tempQueueRegion.addConsumer(context, info);
  387   
  388           case ActiveMQDestination.TEMP_TOPIC_TYPE:
  389               return tempTopicRegion.addConsumer(context, info);
  390   
  391           default:
  392               throw createUnknownDestinationTypeException(destination);
  393           }
  394       }
  395   
  396       public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
  397           ActiveMQDestination destination = info.getDestination();
  398           switch (destination.getDestinationType()) {
  399           case ActiveMQDestination.QUEUE_TYPE:
  400               queueRegion.removeConsumer(context, info);
  401               break;
  402           case ActiveMQDestination.TOPIC_TYPE:
  403               topicRegion.removeConsumer(context, info);
  404               break;
  405           case ActiveMQDestination.TEMP_QUEUE_TYPE:
  406               tempQueueRegion.removeConsumer(context, info);
  407               break;
  408           case ActiveMQDestination.TEMP_TOPIC_TYPE:
  409               tempTopicRegion.removeConsumer(context, info);
  410               break;
  411           default:
  412               throw createUnknownDestinationTypeException(destination);
  413           }
  414       }
  415   
  416       public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
  417           topicRegion.removeSubscription(context, info);
  418       }
  419   
  420       public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
  421           message.setBrokerInTime(System.currentTimeMillis());
  422           if (producerExchange.isMutable() || producerExchange.getRegion() == null) {
  423               ActiveMQDestination destination = message.getDestination();
  424               // ensure the destination is registered with the RegionBroker
  425               producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination);
  426               Region region;
  427               switch (destination.getDestinationType()) {
  428               case ActiveMQDestination.QUEUE_TYPE:
  429                   region = queueRegion;
  430                   break;
  431               case ActiveMQDestination.TOPIC_TYPE:
  432                   region = topicRegion;
  433                   break;
  434               case ActiveMQDestination.TEMP_QUEUE_TYPE:
  435                   region = tempQueueRegion;
  436                   break;
  437               case ActiveMQDestination.TEMP_TOPIC_TYPE:
  438                   region = tempTopicRegion;
  439                   break;
  440               default:
  441                   throw createUnknownDestinationTypeException(destination);
  442               }
  443               producerExchange.setRegion(region);
  444           }
  445           producerExchange.getRegion().send(producerExchange, message);
  446       }
  447   
  448       public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
  449           if (consumerExchange.isWildcard() || consumerExchange.getRegion() == null) {
  450               ActiveMQDestination destination = ack.getDestination();
  451               Region region;
  452               switch (destination.getDestinationType()) {
  453               case ActiveMQDestination.QUEUE_TYPE:
  454                   region = queueRegion;
  455                   break;
  456               case ActiveMQDestination.TOPIC_TYPE:
  457                   region = topicRegion;
  458                   break;
  459               case ActiveMQDestination.TEMP_QUEUE_TYPE:
  460                   region = tempQueueRegion;
  461                   break;
  462               case ActiveMQDestination.TEMP_TOPIC_TYPE:
  463                   region = tempTopicRegion;
  464                   break;
  465               default:
  466                   throw createUnknownDestinationTypeException(destination);
  467               }
  468               consumerExchange.setRegion(region);
  469           }
  470           consumerExchange.getRegion().acknowledge(consumerExchange, ack);
  471       }
  472   
  473       public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
  474           ActiveMQDestination destination = pull.getDestination();
  475           switch (destination.getDestinationType()) {
  476           case ActiveMQDestination.QUEUE_TYPE:
  477               return queueRegion.messagePull(context, pull);
  478   
  479           case ActiveMQDestination.TOPIC_TYPE:
  480               return topicRegion.messagePull(context, pull);
  481   
  482           case ActiveMQDestination.TEMP_QUEUE_TYPE:
  483               return tempQueueRegion.messagePull(context, pull);
  484   
  485           case ActiveMQDestination.TEMP_TOPIC_TYPE:
  486               return tempTopicRegion.messagePull(context, pull);
  487           default:
  488               throw createUnknownDestinationTypeException(destination);
  489           }
  490       }
  491   
  492       public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
  493           throw new IllegalAccessException("Transaction operation not implemented by this broker.");
  494       }
  495   
  496       public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
  497           throw new IllegalAccessException("Transaction operation not implemented by this broker.");
  498       }
  499   
  500       public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
  501           throw new IllegalAccessException("Transaction operation not implemented by this broker.");
  502       }
  503   
  504       public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
  505           throw new IllegalAccessException("Transaction operation not implemented by this broker.");
  506       }
  507   
  508       public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
  509           throw new IllegalAccessException("Transaction operation not implemented by this broker.");
  510       }
  511   
  512       public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
  513           throw new IllegalAccessException("Transaction operation not implemented by this broker.");
  514       }
  515   
  516       public void gc() {
  517           queueRegion.gc();
  518           topicRegion.gc();
  519       }
  520   
  521       public BrokerId getBrokerId() {
  522           if (brokerId == null) {
  523               brokerId = new BrokerId(BROKER_ID_GENERATOR.generateId());
  524           }
  525           return brokerId;
  526       }
  527   
  528       public void setBrokerId(BrokerId brokerId) {
  529           this.brokerId = brokerId;
  530       }
  531   
  532       public String getBrokerName() {
  533           if (brokerName == null) {
  534               try {
  535                   brokerName = java.net.InetAddress.getLocalHost().getHostName().toLowerCase();
  536               } catch (Exception e) {
  537                   brokerName = "localhost";
  538               }
  539           }
  540           return brokerName;
  541       }
  542   
  543       public void setBrokerName(String brokerName) {
  544           this.brokerName = brokerName;
  545       }
  546   
  547       public DestinationStatistics getDestinationStatistics() {
  548           return destinationStatistics;
  549       }
  550   
  551       protected JMSException createUnknownDestinationTypeException(ActiveMQDestination destination) {
  552           return new JMSException("Unknown destination type: " + destination.getDestinationType());
  553       }
  554   
  555       public synchronized void addBroker(Connection connection, BrokerInfo info) {
  556           brokerInfos.add(info);
  557       }
  558   
  559       public synchronized void removeBroker(Connection connection, BrokerInfo info) {
  560           if (info != null) {
  561               brokerInfos.remove(info);
  562           }
  563       }
  564   
  565       public synchronized BrokerInfo[] getPeerBrokerInfos() {
  566           BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
  567           result = brokerInfos.toArray(result);
  568           return result;
  569       }
  570   
  571       public void preProcessDispatch(MessageDispatch messageDispatch) {
  572           Message message = messageDispatch.getMessage();
  573           if (message != null) {
  574               long endTime = System.currentTimeMillis();
  575               message.setBrokerOutTime(endTime);
  576               if (getBrokerService().isEnableStatistics()) {
  577                   long totalTime = endTime - message.getBrokerInTime();
  578                   message.getRegionDestination().getDestinationStatistics().getProcessTime().addTime(totalTime);
  579               }
  580           }
  581       }
  582   
  583       public void postProcessDispatch(MessageDispatch messageDispatch) {
  584       }
  585   
  586       public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
  587           ActiveMQDestination destination = messageDispatchNotification.getDestination();
  588           switch (destination.getDestinationType()) {
  589           case ActiveMQDestination.QUEUE_TYPE:
  590               queueRegion.processDispatchNotification(messageDispatchNotification);
  591               break;
  592           case ActiveMQDestination.TOPIC_TYPE:
  593               topicRegion.processDispatchNotification(messageDispatchNotification);
  594               break;
  595           case ActiveMQDestination.TEMP_QUEUE_TYPE:
  596               tempQueueRegion.processDispatchNotification(messageDispatchNotification);
  597               break;
  598           case ActiveMQDestination.TEMP_TOPIC_TYPE:
  599               tempTopicRegion.processDispatchNotification(messageDispatchNotification);
  600               break;
  601           default:
  602               throw createUnknownDestinationTypeException(destination);
  603           }
  604       }
  605   
  606       public boolean isSlaveBroker() {
  607           return brokerService.isSlave();
  608       }
  609   
  610       public boolean isStopped() {
  611           return !started;
  612       }
  613   
  614       public Set<ActiveMQDestination> getDurableDestinations() {
  615           return destinationFactory.getDestinations();
  616       }
  617   
  618       protected void doStop(ServiceStopper ss) {
  619           ss.stop(queueRegion);
  620           ss.stop(topicRegion);
  621           ss.stop(tempQueueRegion);
  622           ss.stop(tempTopicRegion);
  623       }
  624   
  625       public boolean isKeepDurableSubsActive() {
  626           return keepDurableSubsActive;
  627       }
  628   
  629       public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
  630           this.keepDurableSubsActive = keepDurableSubsActive;
  631       }
  632   
  633       public DestinationInterceptor getDestinationInterceptor() {
  634           return destinationInterceptor;
  635       }
  636   
  637       public ConnectionContext getAdminConnectionContext() {
  638           return adminConnectionContext;
  639       }
  640   
  641       public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
  642           this.adminConnectionContext = adminConnectionContext;
  643       }
  644   
  645       public Map<ConnectionId, ConnectionState> getConnectionStates() {
  646           return connectionStates;
  647       }
  648   
  649       public Store getTempDataStore() {
  650           return brokerService.getTempDataStore();
  651       }
  652   
  653       public URI getVmConnectorURI() {
  654           return brokerService.getVmConnectorURI();
  655       }
  656   
  657       public void brokerServiceStarted() {
  658       }
  659   
  660       public BrokerService getBrokerService() {
  661           return brokerService;
  662       }
  663   
  664       public boolean isExpired(MessageReference messageReference) {
  665           boolean expired = false;
  666           if (messageReference.isExpired()) {
  667               try {
  668                   // prevent duplicate expiry processing
  669                   Message message = messageReference.getMessage();
  670                   synchronized (message) {
  671                       expired = stampAsExpired(message);
  672                   }
  673               } catch (IOException e) {
  674                   LOG.warn("unexpected exception on message expiry determination for: " + messageReference, e);
  675               }
  676           }
  677           return expired;
  678       }
  679      
  680       private boolean stampAsExpired(Message message) throws IOException {
  681           boolean stamped=false;
  682           if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
  683               long expiration=message.getExpiration();     
  684               message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
  685               stamped = true;
  686           }
  687           return stamped;
  688       }
  689   
  690       
  691       public void messageExpired(ConnectionContext context, MessageReference node) {
  692           if (LOG.isDebugEnabled()) {
  693               LOG.debug("Message expired " + node);
  694           }
  695           getRoot().sendToDeadLetterQueue(context, node);
  696       }
  697       
  698       public void sendToDeadLetterQueue(ConnectionContext context,
  699   	        MessageReference node){
  700   		try{
  701   			if(node!=null){
  702   				Message message=node.getMessage();
  703   				if(message!=null && node.getRegionDestination()!=null){
  704   					DeadLetterStrategy deadLetterStrategy=node
  705   					        .getRegionDestination().getDeadLetterStrategy();
  706   					if(deadLetterStrategy!=null){
  707   						if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
  708   						    // message may be inflight to other subscriptions so do not modify
  709   						    message = message.copy();
  710   						    stampAsExpired(message);
  711   						    message.setExpiration(0);
  712   						    if(!message.isPersistent()){
  713   							    message.setPersistent(true);
  714   							    message.setProperty("originalDeliveryMode",
  715   								        "NON_PERSISTENT");
  716   							}
  717   							// The original destination and transaction id do
  718   							// not get filled when the message is first sent,
  719   							// it is only populated if the message is routed to
  720   							// another destination like the DLQ
  721   							ActiveMQDestination deadLetterDestination=deadLetterStrategy
  722   							        .getDeadLetterQueueFor(message
  723   							                .getDestination());
  724   							if (context.getBroker()==null) {
  725   								context.setBroker(getRoot());
  726   							}
  727   							BrokerSupport.resendNoCopy(context,message,
  728   							        deadLetterDestination);
  729   						}
  730   					} else {
  731   					    if (LOG.isDebugEnabled()) {
  732   					        LOG.debug("Expired message with no DLQ strategy in place");
  733   					    }
  734   					}
  735   				}
  736   			}
  737   		}catch(Exception e){
  738   			LOG.warn("Caught an exception sending to DLQ: "+node,e);
  739   		}
  740   	}
  741   
  742       public Broker getRoot() {
  743           try {
  744               return getBrokerService().getBroker();
  745           } catch (Exception e) {
  746               LOG.fatal("Trying to get Root Broker " + e);
  747               throw new RuntimeException("The broker from the BrokerService should not throw an exception");
  748           }
  749       }
  750       
  751       /**
  752        * @return the broker sequence id
  753        */
  754       public long getBrokerSequenceId() {
  755           synchronized(sequenceGenerator) {
  756               return sequenceGenerator.getNextSequenceId();
  757           }
  758       }
  759   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]