Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache.activemq.transport.discovery.http » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transport.discovery.http;
   18   
   19   import java.io.IOException;
   20   import java.util.HashMap;
   21   import java.util.HashSet;
   22   import java.util.Map;
   23   import java.util.Scanner;
   24   import java.util.Set;
   25   import java.util.concurrent.atomic.AtomicBoolean;
   26   import java.util.concurrent.atomic.AtomicInteger;
   27   import java.util.concurrent.atomic.AtomicReference;
   28   
   29   import org.apache.activemq.Service;
   30   import org.apache.activemq.command.DiscoveryEvent;
   31   import org.apache.activemq.transport.discovery.DiscoveryAgent;
   32   import org.apache.activemq.transport.discovery.DiscoveryListener;
   33   import org.apache.activemq.util.IntrospectionSupport;
   34   import org.apache.commons.httpclient.HttpClient;
   35   import org.apache.commons.httpclient.methods.DeleteMethod;
   36   import org.apache.commons.httpclient.methods.GetMethod;
   37   import org.apache.commons.httpclient.methods.PutMethod;
   38   import org.apache.commons.logging.Log;
   39   import org.apache.commons.logging.LogFactory;
   40   
   41   public class HTTPDiscoveryAgent implements DiscoveryAgent {
   42       
   43       private static final Log LOG = LogFactory.getLog(HTTPDiscoveryAgent.class);
   44       
   45       private String registryURL = "http://localhost:8080/discovery-registry/default";
   46       private HttpClient httpClient = new HttpClient();
   47       private AtomicBoolean running=new AtomicBoolean();
   48       private final AtomicReference<DiscoveryListener> discoveryListener = new AtomicReference<DiscoveryListener>();
   49       private final HashSet<String> registeredServices = new HashSet<String>();
   50       private final HashMap<String, SimpleDiscoveryEvent> discoveredServices = new HashMap<String, SimpleDiscoveryEvent>();    
   51       private Thread thread;   
   52       private long updateInterval = 1000*10;
   53       private String brokerName;
   54       private boolean startEmbeddRegistry=false;
   55       private Service jetty;
   56       private AtomicInteger startCounter=new AtomicInteger(0);
   57   
   58       
   59       private long initialReconnectDelay = 1000;
   60       private long maxReconnectDelay = 1000 * 30;
   61       private long backOffMultiplier = 2;
   62       private boolean useExponentialBackOff=true;    
   63       private int maxReconnectAttempts;
   64       private final Object sleepMutex = new Object();
   65       private long minConnectTime = 5000;
   66       
   67       class SimpleDiscoveryEvent extends DiscoveryEvent {
   68   
   69           private int connectFailures;
   70           private long reconnectDelay = initialReconnectDelay;
   71           private long connectTime = System.currentTimeMillis();
   72           private AtomicBoolean failed = new AtomicBoolean(false);
   73           private AtomicBoolean removed = new AtomicBoolean(false);
   74   
   75           public SimpleDiscoveryEvent(String service) {
   76               super(service);
   77           }
   78   
   79       }
   80   
   81       
   82       public String getGroup() {
   83           return null;
   84       }
   85   
   86       public void registerService(String service) throws IOException {
   87           synchronized(registeredServices) {
   88               registeredServices.add(service);
   89           }
   90           doRegister(service);
   91       }
   92   
   93       synchronized private void doRegister(String service) {
   94           String url = registryURL;
   95           try {
   96               PutMethod method = new PutMethod(url);
   97   //            method.setParams(createParams());
   98               method.setRequestHeader("service", service);
   99               int responseCode = httpClient.executeMethod(method);
  100               LOG.debug("PUT to "+url+" got a "+responseCode);
  101           } catch (Exception e) {
  102               LOG.debug("PUT to "+url+" failed with: "+e);
  103           }
  104       }
  105       
  106       synchronized private void doUnRegister(String service) {
  107           String url = registryURL;
  108           try {
  109               DeleteMethod method = new DeleteMethod(url);
  110   //            method.setParams(createParams());
  111               method.setRequestHeader("service", service);
  112               int responseCode = httpClient.executeMethod(method);
  113               LOG.debug("DELETE to "+url+" got a "+responseCode);
  114           } catch (Exception e) {
  115               LOG.debug("DELETE to "+url+" failed with: "+e);
  116           }
  117       }
  118   
  119   //    private HttpMethodParams createParams() {
  120   //        HttpMethodParams params = new HttpMethodParams();
  121   //        params.setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(0,false));
  122   //        return params;
  123   //    }
  124       
  125       synchronized private Set<String> doLookup(long freshness) {
  126           String url = registryURL+"?freshness="+freshness;
  127           try {
  128               GetMethod method = new GetMethod(url);
  129   //            method.setParams(createParams());
  130               int responseCode = httpClient.executeMethod(method);
  131               LOG.debug("GET to "+url+" got a "+responseCode);
  132               if( responseCode == 200 ) {
  133                   Set<String> rc = new HashSet<String>();
  134                   Scanner scanner = new Scanner(method.getResponseBodyAsStream());
  135                   while( scanner.hasNextLine() ) {
  136                       String service = scanner.nextLine();
  137                       if( service.trim().length() != 0 ) {
  138                           rc.add(service);
  139                       }
  140                   }
  141                   return rc;
  142               } else {
  143                   LOG.debug("GET to "+url+" failed with response code: "+responseCode);
  144                   return null;
  145               }
  146           } catch (Exception e) {
  147               LOG.debug("GET to "+url+" failed with: "+e);
  148               return null;
  149           }
  150       }
  151   
  152       public void serviceFailed(DiscoveryEvent devent) throws IOException {
  153   
  154           final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
  155           if (event.failed.compareAndSet(false, true)) {
  156           	discoveryListener.get().onServiceRemove(event);
  157           	if(!event.removed.get()) {
  158   	        	// Setup a thread to re-raise the event...
  159   	            Thread thread = new Thread() {
  160   	                public void run() {
  161   	
  162   	                    // We detect a failed connection attempt because the service
  163   	                    // fails right away.
  164   	                    if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
  165   	                        LOG.debug("Failure occured soon after the discovery event was generated.  It will be clasified as a connection failure: "+event);
  166   	
  167   	                        event.connectFailures++;
  168   	
  169   	                        if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
  170   	                            LOG.debug("Reconnect attempts exceeded "+maxReconnectAttempts+" tries.  Reconnecting has been disabled.");
  171   	                            return;
  172   	                        }
  173   	
  174   	                        synchronized (sleepMutex) {
  175   	                            try {
  176   	                                if (!running.get() || event.removed.get()) {
  177   	                                    return;
  178   	                                }
  179   	                                LOG.debug("Waiting "+event.reconnectDelay+" ms before attepting to reconnect.");
  180   	                                sleepMutex.wait(event.reconnectDelay);
  181   	                            } catch (InterruptedException ie) {
  182   	                                Thread.currentThread().interrupt();
  183   	                                return;
  184   	                            }
  185   	                        }
  186   	
  187   	                        if (!useExponentialBackOff) {
  188   	                            event.reconnectDelay = initialReconnectDelay;
  189   	                        } else {
  190   	                            // Exponential increment of reconnect delay.
  191   	                            event.reconnectDelay *= backOffMultiplier;
  192   	                            if (event.reconnectDelay > maxReconnectDelay) {
  193   	                                event.reconnectDelay = maxReconnectDelay;
  194   	                            }
  195   	                        }
  196   	
  197   	                    } else {
  198   	                        event.connectFailures = 0;
  199   	                        event.reconnectDelay = initialReconnectDelay;
  200   	                    }
  201   	
  202   	                    if (!running.get() || event.removed.get()) {
  203   	                        return;
  204   	                    }
  205   	
  206   	                    event.connectTime = System.currentTimeMillis();
  207   	                    event.failed.set(false);
  208   	                    discoveryListener.get().onServiceAdd(event);
  209   	                }
  210   	            };
  211   	            thread.setDaemon(true);
  212   	            thread.start();
  213           	}
  214           }
  215       }
  216   
  217   
  218       public void setBrokerName(String brokerName) {
  219           this.brokerName = brokerName;
  220       }
  221   
  222       public void setDiscoveryListener(DiscoveryListener discoveryListener) {
  223           this.discoveryListener.set(discoveryListener);
  224       }
  225   
  226       public void setGroup(String group) {
  227       }
  228   
  229       public void start() throws Exception {
  230           if( startCounter.addAndGet(1)==1 ) {
  231               if( startEmbeddRegistry ) {
  232                   jetty = createEmbeddedJettyServer();
  233                   Map props = new HashMap();
  234                   props.put("agent", this);
  235                   IntrospectionSupport.setProperties(jetty, props);
  236                   jetty.start();
  237               }
  238               
  239               running.set(true);
  240               thread = new Thread("HTTPDiscovery Agent") {
  241                   @Override
  242                   public void run() {
  243                       while(running.get()) {
  244                           try {
  245                               update();
  246                               Thread.sleep(updateInterval);
  247                           } catch (InterruptedException e) {
  248                               return;
  249                           }
  250                       }
  251                   }
  252               };
  253               thread.setDaemon(true);
  254               thread.start();
  255           }
  256       }
  257   
  258       /**
  259        * Create the EmbeddedJettyServer instance via reflection so that we can avoid a hard runtime dependency on 
  260        * jetty.
  261        * 
  262        * @return
  263        * @throws Exception
  264        */
  265       private Service createEmbeddedJettyServer()  throws Exception {
  266           Class clazz = HTTPDiscoveryAgent.class.getClassLoader().loadClass("org.apache.activemq.transport.discovery.http.EmbeddedJettyServer");
  267           return (Service)clazz.newInstance();
  268       }
  269   
  270       private void update() {
  271           // Register all our services...
  272           synchronized(registeredServices) {
  273               for (String service : registeredServices) {
  274                   doRegister(service);
  275               }
  276           }
  277           
  278           // Find new registered services...
  279           DiscoveryListener discoveryListener = this.discoveryListener.get();
  280           if(discoveryListener!=null) {
  281               Set<String> activeServices = doLookup(updateInterval*3);
  282               // If there is error talking the the central server, then activeServices == null
  283               if( activeServices !=null ) {
  284                   synchronized(discoveredServices) {
  285                       
  286                       HashSet<String> removedServices = new HashSet<String>(discoveredServices.keySet());
  287                       removedServices.removeAll(activeServices);
  288                       
  289                       HashSet<String> addedServices = new HashSet<String>(activeServices);
  290                       addedServices.removeAll(discoveredServices.keySet());
  291                       addedServices.removeAll(removedServices);
  292                       
  293                       for (String service : addedServices) {
  294                           SimpleDiscoveryEvent e = new SimpleDiscoveryEvent(service);
  295                           discoveredServices.put(service, e);
  296                           discoveryListener.onServiceAdd(e);
  297                       }
  298                       
  299                       for (String service : removedServices) {
  300                       	SimpleDiscoveryEvent e = discoveredServices.remove(service);
  301                       	if( e !=null ) {
  302                       		e.removed.set(true);
  303                       	}
  304                           discoveryListener.onServiceRemove(e);
  305                       }
  306                   }
  307               }
  308           }
  309       }
  310   
  311       public void stop() throws Exception {
  312           if( startCounter.decrementAndGet()==0 ) {
  313               running.set(false);
  314               if( thread!=null ) {
  315                   thread.join(updateInterval*3);
  316                   thread=null;
  317               }
  318               if( jetty!=null ) {
  319                   jetty.stop();
  320                   jetty = null;
  321               }
  322           }
  323       }
  324   
  325       public String getRegistryURL() {
  326           return registryURL;
  327       }
  328   
  329       public void setRegistryURL(String discoveryRegistryURL) {
  330           this.registryURL = discoveryRegistryURL;
  331       }
  332   
  333       public long getUpdateInterval() {
  334           return updateInterval;
  335       }
  336   
  337       public void setUpdateInterval(long updateInterval) {
  338           this.updateInterval = updateInterval;
  339       }
  340   
  341       public boolean isStartEmbeddRegistry() {
  342           return startEmbeddRegistry;
  343       }
  344   
  345       public void setStartEmbeddRegistry(boolean startEmbeddRegistry) {
  346           this.startEmbeddRegistry = startEmbeddRegistry;
  347       }
  348   
  349   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache.activemq.transport.discovery.http » [javadoc | source]