Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region;
   18   
   19   import java.io.IOException;
   20   import java.util.Set;
   21   import org.apache.activemq.advisory.AdvisorySupport;
   22   import org.apache.activemq.broker.BrokerService;
   23   import org.apache.activemq.broker.ConnectionContext;
   24   import org.apache.activemq.broker.region.policy.PolicyEntry;
   25   import org.apache.activemq.command.ActiveMQDestination;
   26   import org.apache.activemq.command.ActiveMQQueue;
   27   import org.apache.activemq.command.ActiveMQTempDestination;
   28   import org.apache.activemq.command.ActiveMQTopic;
   29   import org.apache.activemq.command.SubscriptionInfo;
   30   import org.apache.activemq.store.MessageStore;
   31   import org.apache.activemq.store.PersistenceAdapter;
   32   import org.apache.activemq.store.TopicMessageStore;
   33   import org.apache.activemq.thread.TaskRunnerFactory;
   34   
   35   /**
   36    * Creates standard ActiveMQ implementations of
   37    * {@link org.apache.activemq.broker.region.Destination}.
   38    * 
   39    * @author fateev@amazon.com
   40    * @version $Revision: 732259 $
   41    */
   42   public class DestinationFactoryImpl extends DestinationFactory {
   43   
   44       protected final TaskRunnerFactory taskRunnerFactory;
   45       protected final PersistenceAdapter persistenceAdapter;
   46       protected RegionBroker broker;
   47       private final BrokerService brokerService;
   48   
   49       public DestinationFactoryImpl(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
   50           this.brokerService = brokerService;
   51           this.taskRunnerFactory = taskRunnerFactory;
   52           if (persistenceAdapter == null) {
   53               throw new IllegalArgumentException("null persistenceAdapter");
   54           }
   55           this.persistenceAdapter = persistenceAdapter;
   56       }
   57   
   58       public void setRegionBroker(RegionBroker broker) {
   59           if (broker == null) {
   60               throw new IllegalArgumentException("null broker");
   61           }
   62           this.broker = broker;
   63       }
   64   
   65       public Set<ActiveMQDestination> getDestinations() {
   66           return persistenceAdapter.getDestinations();
   67       }
   68   
   69       /**
   70        * @return instance of {@link Queue} or {@link Topic}
   71        */
   72       public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception {
   73           if (destination.isQueue()) {
   74               if (destination.isTemporary()) {
   75                   final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
   76                   Queue queue = new TempQueue(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
   77                   queue.initialize();
   78                   return queue;
   79               } else {
   80                   MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
   81                   Queue queue = new Queue(brokerService, destination, store, destinationStatistics, taskRunnerFactory);
   82                   configureQueue(queue, destination);
   83                   queue.initialize();
   84                   return queue;
   85               }
   86           } else if (destination.isTemporary()) {
   87               
   88               Topic topic = new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory);
   89               topic.initialize();
   90               return topic;
   91           } else {
   92               TopicMessageStore store = null;
   93               if (!AdvisorySupport.isAdvisoryTopic(destination)) {
   94                   store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination);
   95               }
   96               Topic topic = new Topic(brokerService, destination, store, destinationStatistics, taskRunnerFactory);
   97               configureTopic(topic, destination);
   98               topic.initialize();
   99               return topic;
  100           }
  101       }
  102   
  103       public void removeDestination(Destination dest) {
  104           ActiveMQDestination destination = dest.getActiveMQDestination();
  105           if (!destination.isTemporary()) {
  106               if (destination.isQueue()) {
  107                   persistenceAdapter.removeQueueMessageStore((ActiveMQQueue) destination);
  108               }
  109               else if (!AdvisorySupport.isAdvisoryTopic(destination)) {
  110                   persistenceAdapter.removeTopicMessageStore((ActiveMQTopic) destination);
  111               }
  112           }
  113       }
  114   
  115       protected void configureQueue(Queue queue, ActiveMQDestination destination) {
  116           if (broker == null) {
  117               throw new IllegalStateException("broker property is not set");
  118           }
  119           if (broker.getDestinationPolicy() != null) {
  120               PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
  121               if (entry != null) {
  122                   entry.configure(broker,queue);
  123               }
  124           }
  125       }
  126   
  127       protected void configureTopic(Topic topic, ActiveMQDestination destination) {
  128           if (broker == null) {
  129               throw new IllegalStateException("broker property is not set");
  130           }
  131           if (broker.getDestinationPolicy() != null) {
  132               PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
  133               if (entry != null) {
  134                   entry.configure(topic);
  135               }
  136           }
  137       }
  138   
  139       public long getLastMessageBrokerSequenceId() throws IOException {
  140           return persistenceAdapter.getLastMessageBrokerSequenceId();
  141       }
  142   
  143       public PersistenceAdapter getPersistenceAdapter() {
  144           return persistenceAdapter;
  145       }
  146   
  147       public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException {
  148           return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions();
  149       }
  150   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]