Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » peer » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transport.peer;
   18   
   19   import java.io.IOException;
   20   import java.net.URI;
   21   import java.net.URISyntaxException;
   22   import java.util.HashMap;
   23   import java.util.Map;
   24   import java.util.concurrent.ConcurrentHashMap;
   25   
   26   import org.apache.activemq.broker.BrokerFactoryHandler;
   27   import org.apache.activemq.broker.BrokerService;
   28   import org.apache.activemq.broker.TransportConnector;
   29   import org.apache.activemq.transport.Transport;
   30   import org.apache.activemq.transport.TransportFactory;
   31   import org.apache.activemq.transport.TransportServer;
   32   import org.apache.activemq.transport.vm.VMTransportFactory;
   33   import org.apache.activemq.util.IOExceptionSupport;
   34   import org.apache.activemq.util.IdGenerator;
   35   import org.apache.activemq.util.IntrospectionSupport;
   36   import org.apache.activemq.util.URISupport;
   37   
   38   public class PeerTransportFactory extends TransportFactory {
   39   
   40       public static final ConcurrentHashMap BROKERS = new ConcurrentHashMap();
   41       public static final ConcurrentHashMap CONNECTORS = new ConcurrentHashMap();
   42       public static final ConcurrentHashMap SERVERS = new ConcurrentHashMap();
   43       private static final IdGenerator ID_GENERATOR = new IdGenerator("peer-");
   44   
   45       public Transport doConnect(URI location) throws Exception {
   46           VMTransportFactory vmTransportFactory = createTransportFactory(location);
   47           return vmTransportFactory.doConnect(location);
   48       }
   49   
   50       public Transport doCompositeConnect(URI location) throws Exception {
   51           VMTransportFactory vmTransportFactory = createTransportFactory(location);
   52           return vmTransportFactory.doCompositeConnect(location);
   53       }
   54   
   55       /**
   56        * @param location
   57        * @return the converted URI
   58        * @throws URISyntaxException
   59        */
   60       private VMTransportFactory createTransportFactory(URI location) throws IOException {
   61           try {
   62               String group = location.getHost();
   63               String broker = URISupport.stripPrefix(location.getPath(), "/");
   64   
   65               if (group == null) {
   66                   group = "default";
   67               }
   68               if (broker == null || broker.length() == 0) {
   69                   broker = ID_GENERATOR.generateSanitizedId();
   70               }
   71   
   72               final Map<String, String> brokerOptions = new HashMap<String, String>(URISupport.parseParamters(location));
   73               if (!brokerOptions.containsKey("persistent")) {
   74                   brokerOptions.put("persistent", "false");
   75               }
   76   
   77               final URI finalLocation = new URI("vm://" + broker);
   78               final String finalBroker = broker;
   79               final String finalGroup = group;
   80               VMTransportFactory rc = new VMTransportFactory() {
   81                   public Transport doConnect(URI ignore) throws Exception {
   82                       return super.doConnect(finalLocation);
   83                   };
   84   
   85                   public Transport doCompositeConnect(URI ignore) throws Exception {
   86                       return super.doCompositeConnect(finalLocation);
   87                   };
   88               };
   89               rc.setBrokerFactoryHandler(new BrokerFactoryHandler() {
   90                   public BrokerService createBroker(URI brokerURI) throws Exception {
   91                       BrokerService service = new BrokerService();
   92                       IntrospectionSupport.setProperties(service, brokerOptions);
   93                       service.setBrokerName(finalBroker);
   94                       TransportConnector c = service.addConnector("tcp://localhost:0");
   95                       c.setDiscoveryUri(new URI("multicast://default?group=" + finalGroup));
   96                       service.addNetworkConnector("multicast://default?group=" + finalGroup);
   97                       return service;
   98                   }
   99               });
  100               return rc;
  101   
  102           } catch (URISyntaxException e) {
  103               throw IOExceptionSupport.create(e);
  104           }
  105       }
  106   
  107       public TransportServer doBind(URI location) throws IOException {
  108           throw new IOException("This protocol does not support being bound.");
  109       }
  110   
  111   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » peer » [javadoc | source]