Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » discovery » multicast » [javadoc | source]

    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transport.discovery.multicast;
   18   
   19   import java.io.IOException;
   20   import java.net.DatagramPacket;
   21   import java.net.InetAddress;
   22   import java.net.InetSocketAddress;
   23   import java.net.MulticastSocket;
   24   import java.net.NetworkInterface;
   25   import java.net.SocketAddress;
   26   import java.net.SocketTimeoutException;
   27   import java.net.URI;
   28   import java.util.Iterator;
   29   import java.util.Map;
   30   import java.util.concurrent.ConcurrentHashMap;
   31   import java.util.concurrent.ExecutorService;
   32   import java.util.concurrent.LinkedBlockingQueue;
   33   import java.util.concurrent.ThreadFactory;
   34   import java.util.concurrent.ThreadPoolExecutor;
   35   import java.util.concurrent.TimeUnit;
   36   import java.util.concurrent.atomic.AtomicBoolean;
   37   
   38   import org.apache.activemq.command.DiscoveryEvent;
   39   import org.apache.activemq.transport.discovery.DiscoveryAgent;
   40   import org.apache.activemq.transport.discovery.DiscoveryListener;
   41   import org.apache.commons.logging.Log;
   42   import org.apache.commons.logging.LogFactory;
   43   
   44   /**
   45    * A {@link DiscoveryAgent} using a multicast address and heartbeat packets
   46    * encoded using any wireformat, but openwire by default.
   47    * 
   48    * @version $Revision$
   49    */
   50   public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable {
   51   
   52       public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155";
   53       public static final String DEFAULT_HOST_STR = "default"; 
   54       public static final String DEFAULT_HOST_IP  = System.getProperty("activemq.partition.discovery", "239.255.2.3"); 
   55       public static final int    DEFAULT_PORT  = 6155; 
   56           
   57       private static final Log LOG = LogFactory.getLog(MulticastDiscoveryAgent.class);
   58       private static final String TYPE_SUFFIX = "ActiveMQ-4.";
   59       private static final String ALIVE = "alive.";
   60       private static final String DEAD = "dead.";
   61       private static final String DELIMITER = "%";
   62       private static final int BUFF_SIZE = 8192;
   63       private static final int DEFAULT_IDLE_TIME = 500;
   64       private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10;
   65   
   66       private long initialReconnectDelay = 1000 * 5;
   67       private long maxReconnectDelay = 1000 * 30;
   68       private long backOffMultiplier = 2;
   69       private boolean useExponentialBackOff;
   70       private int maxReconnectAttempts;
   71   
   72       private int timeToLive = 1;
   73       private boolean loopBackMode;
   74       private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>();
   75       private String group = "default";
   76       private URI discoveryURI;
   77       private InetAddress inetAddress;
   78       private SocketAddress sockAddress;
   79       private DiscoveryListener discoveryListener;
   80       private String selfService;
   81       private MulticastSocket mcast;
   82       private Thread runner;
   83       private long keepAliveInterval = DEFAULT_IDLE_TIME;
   84       private String mcInterface;
   85       private String mcNetworkInterface;
   86       private long lastAdvertizeTime;
   87       private AtomicBoolean started = new AtomicBoolean(false);
   88       private boolean reportAdvertizeFailed = true;
   89       private ExecutorService executor = null;
   90   
   91       class RemoteBrokerData {
   92           final String brokerName;
   93           final String service;
   94           long lastHeartBeat;
   95           long recoveryTime;
   96           int failureCount;
   97           boolean failed;
   98   
   99           public RemoteBrokerData(String brokerName, String service) {
  100               this.brokerName = brokerName;
  101               this.service = service;
  102               this.lastHeartBeat = System.currentTimeMillis();
  103           }
  104   
  105           public synchronized void updateHeartBeat() {
  106               lastHeartBeat = System.currentTimeMillis();
  107   
  108               // Consider that the broker recovery has succeeded if it has not
  109               // failed in 60 seconds.
  110               if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
  111                   if (LOG.isDebugEnabled()) {
  112                       LOG.debug("I now think that the " + service + " service has recovered.");
  113                   }
  114                   failureCount = 0;
  115                   recoveryTime = 0;
  116               }
  117           }
  118   
  119           public synchronized long getLastHeartBeat() {
  120               return lastHeartBeat;
  121           }
  122   
  123           public synchronized boolean markFailed() {
  124               if (!failed) {
  125                   failed = true;
  126                   failureCount++;
  127   
  128                   long reconnectDelay;
  129                   if (!useExponentialBackOff) {
  130                       reconnectDelay = initialReconnectDelay;
  131                   } else {
  132                       reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount);
  133                       if (reconnectDelay > maxReconnectDelay) {
  134                           reconnectDelay = maxReconnectDelay;
  135                       }
  136                   }
  137   
  138                   if (LOG.isDebugEnabled()) {
  139                       LOG.debug("Remote failure of " + service + " while still receiving multicast advertisements.  Advertising events will be suppressed for " + reconnectDelay
  140                                 + " ms, the current failure count is: " + failureCount);
  141                   }
  142   
  143                   recoveryTime = System.currentTimeMillis() + reconnectDelay;
  144                   return true;
  145               }
  146               return false;
  147           }
  148   
  149           /**
  150            * @return true if this broker is marked failed and it is now the right
  151            *         time to start recovery.
  152            */
  153           public synchronized boolean doRecovery() {
  154               if (!failed) {
  155                   return false;
  156               }
  157   
  158               // Are we done trying to recover this guy?
  159               if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
  160                   if (LOG.isDebugEnabled()) {
  161                       LOG.debug("Max reconnect attempts of the " + service + " service has been reached.");
  162                   }
  163                   return false;
  164               }
  165   
  166               // Is it not yet time?
  167               if (System.currentTimeMillis() < recoveryTime) {
  168                   return false;
  169               }
  170   
  171               if (LOG.isDebugEnabled()) {
  172                   LOG.debug("Resuming event advertisement of the " + service + " service.");
  173               }
  174               failed = false;
  175               return true;
  176           }
  177   
  178           public boolean isFailed() {
  179               return failed;
  180           }
  181       }
  182   
  183       /**
  184        * Set the discovery listener
  185        * 
  186        * @param listener
  187        */
  188       public void setDiscoveryListener(DiscoveryListener listener) {
  189           this.discoveryListener = listener;
  190       }
  191   
  192       /**
  193        * register a service
  194        */
  195       public void registerService(String name) throws IOException {
  196           this.selfService = name;
  197           if (started.get()) {
  198               doAdvertizeSelf();
  199           }
  200       }
  201   
  202       /**
  203        * @return Returns the loopBackMode.
  204        */
  205       public boolean isLoopBackMode() {
  206           return loopBackMode;
  207       }
  208   
  209       /**
  210        * @param loopBackMode The loopBackMode to set.
  211        */
  212       public void setLoopBackMode(boolean loopBackMode) {
  213           this.loopBackMode = loopBackMode;
  214       }
  215   
  216       /**
  217        * @return Returns the timeToLive.
  218        */
  219       public int getTimeToLive() {
  220           return timeToLive;
  221       }
  222   
  223       /**
  224        * @param timeToLive The timeToLive to set.
  225        */
  226       public void setTimeToLive(int timeToLive) {
  227           this.timeToLive = timeToLive;
  228       }
  229   
  230       /**
  231        * @return the discoveryURI
  232        */
  233       public URI getDiscoveryURI() {
  234           return discoveryURI;
  235       }
  236   
  237       /**
  238        * Set the discoveryURI
  239        * 
  240        * @param discoveryURI
  241        */
  242       public void setDiscoveryURI(URI discoveryURI) {
  243           this.discoveryURI = discoveryURI;
  244       }
  245   
  246       public long getKeepAliveInterval() {
  247           return keepAliveInterval;
  248       }
  249   
  250       public void setKeepAliveInterval(long keepAliveInterval) {
  251           this.keepAliveInterval = keepAliveInterval;
  252       }
  253       
  254       public void setInterface(String mcInterface) {
  255           this.mcInterface = mcInterface;
  256       }
  257       
  258       public void setNetworkInterface(String mcNetworkInterface) {
  259           this.mcNetworkInterface = mcNetworkInterface;    
  260       }
  261       
  262       /**
  263        * start the discovery agent
  264        * 
  265        * @throws Exception
  266        */
  267       public void start() throws Exception {
  268       	
  269           if (started.compareAndSet(false, true)) {        	
  270           	         	
  271               if (group == null || group.length() == 0) {
  272                   throw new IOException("You must specify a group to discover");
  273               }
  274               String type = getType();
  275               if (!type.endsWith(".")) {
  276                   LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type");
  277                   type += ".";
  278               }
  279               
  280               if (discoveryURI == null) {
  281                   discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING);
  282               }
  283               
  284               if (LOG.isTraceEnabled()) 
  285           	  	LOG.trace("start - discoveryURI = " + discoveryURI);        	  	        	  
  286           	  
  287           	  String myHost = discoveryURI.getHost();
  288           	  int    myPort = discoveryURI.getPort(); 
  289           	     
  290           	  if( DEFAULT_HOST_STR.equals(myHost) ) 
  291           	  	myHost = DEFAULT_HOST_IP;       	      	  
  292           	  
  293           	  if(myPort < 0 )
  294           	    myPort = DEFAULT_PORT;        	    
  295           	  
  296           	  if (LOG.isTraceEnabled()) {
  297           	  	LOG.trace("start - myHost = " + myHost); 
  298           	  	LOG.trace("start - myPort = " + myPort);   	
  299           	  	LOG.trace("start - group  = " + group );		       	  	
  300           	  	LOG.trace("start - interface  = " + mcInterface );
  301           	  	LOG.trace("start - network interface  = " + mcNetworkInterface );
  302           	  }	
  303           	  
  304               this.inetAddress = InetAddress.getByName(myHost);
  305               this.sockAddress = new InetSocketAddress(this.inetAddress, myPort);
  306               mcast = new MulticastSocket(myPort);
  307               mcast.setLoopbackMode(loopBackMode);
  308               mcast.setTimeToLive(getTimeToLive());
  309               mcast.joinGroup(inetAddress);
  310               mcast.setSoTimeout((int)keepAliveInterval);
  311               if (mcInterface != null) {
  312                   mcast.setInterface(InetAddress.getByName(mcInterface));
  313               }
  314               if (mcNetworkInterface != null) {
  315                   mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface));
  316               }
  317               runner = new Thread(this);
  318               runner.setName(this.toString() + ":" + runner.getName());
  319               runner.setDaemon(true);
  320               runner.start();
  321               doAdvertizeSelf();
  322           }
  323       }
  324   
  325       /**
  326        * stop the channel
  327        * 
  328        * @throws Exception
  329        */
  330       public void stop() throws Exception {
  331           if (started.compareAndSet(true, false)) {
  332               doAdvertizeSelf();
  333               if (mcast != null) {
  334                   mcast.close();
  335               }
  336               if (runner != null) {
  337                   runner.interrupt();
  338               }
  339               getExecutor().shutdownNow();
  340           }
  341       }
  342   
  343       public String getType() {
  344           return group + "." + TYPE_SUFFIX;
  345       }
  346   
  347       public void run() {
  348           byte[] buf = new byte[BUFF_SIZE];
  349           DatagramPacket packet = new DatagramPacket(buf, 0, buf.length);
  350           while (started.get()) {
  351               doTimeKeepingServices();
  352               try {
  353                   mcast.receive(packet);
  354                   if (packet.getLength() > 0) {
  355                       String str = new String(packet.getData(), packet.getOffset(), packet.getLength());
  356                       processData(str);
  357                   }
  358               } catch (SocketTimeoutException se) {
  359                   // ignore
  360               } catch (IOException e) {
  361                   if (started.get()) {
  362                       LOG.error("failed to process packet: " + e);
  363                   }
  364               }
  365           }
  366       }
  367   
  368       private void processData(String str) {
  369           if (discoveryListener != null) {
  370               if (str.startsWith(getType())) {
  371                   String payload = str.substring(getType().length());
  372                   if (payload.startsWith(ALIVE)) {
  373                       String brokerName = getBrokerName(payload.substring(ALIVE.length()));
  374                       String service = payload.substring(ALIVE.length() + brokerName.length() + 2);
  375                       processAlive(brokerName, service);
  376                   } else {
  377                       String brokerName = getBrokerName(payload.substring(DEAD.length()));
  378                       String service = payload.substring(DEAD.length() + brokerName.length() + 2);
  379                       processDead(service);
  380                   }
  381               }
  382           }
  383       }
  384   
  385       private void doTimeKeepingServices() {
  386           if (started.get()) {
  387               long currentTime = System.currentTimeMillis();
  388               if (currentTime < lastAdvertizeTime || ((currentTime - keepAliveInterval) > lastAdvertizeTime)) {
  389                   doAdvertizeSelf();
  390                   lastAdvertizeTime = currentTime;
  391               }
  392               doExpireOldServices();
  393           }
  394       }
  395   
  396       private void doAdvertizeSelf() {
  397           if (selfService != null) {
  398               String payload = getType();
  399               payload += started.get() ? ALIVE : DEAD;
  400               payload += DELIMITER + "localhost" + DELIMITER;
  401               payload += selfService;
  402               try {
  403                   byte[] data = payload.getBytes();
  404                   DatagramPacket packet = new DatagramPacket(data, 0, data.length, sockAddress);
  405                   mcast.send(packet);
  406               } catch (IOException e) {
  407                   // If a send fails, chances are all subsequent sends will fail
  408                   // too.. No need to keep reporting the
  409                   // same error over and over.
  410                   if (reportAdvertizeFailed) {
  411                       reportAdvertizeFailed = false;
  412                       LOG.error("Failed to advertise our service: " + payload, e);
  413                       if ("Operation not permitted".equals(e.getMessage())) {
  414                           LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup.  "
  415                                     + "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress());
  416                       }
  417                   }
  418               }
  419           }
  420       }
  421   
  422       private void processAlive(String brokerName, String service) {
  423           if (selfService == null || !service.equals(selfService)) {
  424               RemoteBrokerData data = brokersByService.get(service);
  425               if (data == null) {
  426                   data = new RemoteBrokerData(brokerName, service);
  427                   brokersByService.put(service, data);      
  428                   fireServiceAddEvent(data);
  429                   doAdvertizeSelf();
  430               } else {
  431                   data.updateHeartBeat();
  432                   if (data.doRecovery()) {
  433                       fireServiceAddEvent(data);
  434                   }
  435               }
  436           }
  437       }
  438   
  439       private void processDead(String service) {
  440           if (!service.equals(selfService)) {
  441               RemoteBrokerData data = brokersByService.remove(service);
  442               if (data != null && !data.isFailed()) {
  443                   fireServiceRemovedEvent(data);
  444               }
  445           }
  446       }
  447   
  448       private void doExpireOldServices() {
  449           long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 
  450           for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
  451               RemoteBrokerData data = i.next();
  452               if (data.getLastHeartBeat() < expireTime) {
  453                   processDead(data.service);
  454               }
  455           }
  456       }
  457   
  458       private String getBrokerName(String str) {
  459           String result = null;
  460           int start = str.indexOf(DELIMITER);
  461           if (start >= 0) {
  462               int end = str.indexOf(DELIMITER, start + 1);
  463               result = str.substring(start + 1, end);
  464           }
  465           return result;
  466       }
  467   
  468       public void serviceFailed(DiscoveryEvent event) throws IOException {
  469           RemoteBrokerData data = brokersByService.get(event.getServiceName());
  470           if (data != null && data.markFailed()) {
  471               fireServiceRemovedEvent(data);
  472           }
  473       }
  474   
  475       private void fireServiceRemovedEvent(RemoteBrokerData data) {
  476           if (discoveryListener != null) {
  477               final DiscoveryEvent event = new DiscoveryEvent(data.service);
  478               event.setBrokerName(data.brokerName);
  479   
  480               // Have the listener process the event async so that
  481               // he does not block this thread since we are doing time sensitive
  482               // processing of events.
  483               getExecutor().execute(new Runnable() {
  484                   public void run() {
  485                       DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
  486                       if (discoveryListener != null) {
  487                           discoveryListener.onServiceRemove(event);
  488                       }
  489                   }
  490               });
  491           }
  492       }
  493   
  494       private void fireServiceAddEvent(RemoteBrokerData data) {
  495           if (discoveryListener != null) {
  496               final DiscoveryEvent event = new DiscoveryEvent(data.service);
  497               event.setBrokerName(data.brokerName);
  498               
  499               // Have the listener process the event async so that
  500               // he does not block this thread since we are doing time sensitive
  501               // processing of events.
  502               getExecutor().execute(new Runnable() {
  503                   public void run() {
  504                       DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
  505                       if (discoveryListener != null) {
  506                           discoveryListener.onServiceAdd(event);
  507                       }
  508                   }
  509               });
  510           }
  511       }
  512   
  513       private ExecutorService getExecutor() {
  514           if (executor == null) {
  515               final String threadName = "Notifier-" + this.toString();
  516               executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
  517                   public Thread newThread(Runnable runable) {
  518                       Thread t = new Thread(runable,  threadName);
  519                       t.setDaemon(true);
  520                       return t;
  521                   }
  522               });
  523           }
  524           return executor;
  525       }
  526   
  527       public long getBackOffMultiplier() {
  528           return backOffMultiplier;
  529       }
  530   
  531       public void setBackOffMultiplier(long backOffMultiplier) {
  532           this.backOffMultiplier = backOffMultiplier;
  533       }
  534   
  535       public long getInitialReconnectDelay() {
  536           return initialReconnectDelay;
  537       }
  538   
  539       public void setInitialReconnectDelay(long initialReconnectDelay) {
  540           this.initialReconnectDelay = initialReconnectDelay;
  541       }
  542   
  543       public int getMaxReconnectAttempts() {
  544           return maxReconnectAttempts;
  545       }
  546   
  547       public void setMaxReconnectAttempts(int maxReconnectAttempts) {
  548           this.maxReconnectAttempts = maxReconnectAttempts;
  549       }
  550   
  551       public long getMaxReconnectDelay() {
  552           return maxReconnectDelay;
  553       }
  554   
  555       public void setMaxReconnectDelay(long maxReconnectDelay) {
  556           this.maxReconnectDelay = maxReconnectDelay;
  557       }
  558   
  559       public boolean isUseExponentialBackOff() {
  560           return useExponentialBackOff;
  561       }
  562   
  563       public void setUseExponentialBackOff(boolean useExponentialBackOff) {
  564           this.useExponentialBackOff = useExponentialBackOff;
  565       }
  566   
  567       public void setGroup(String group) {
  568           this.group = group;
  569       }
  570       
  571       @Override
  572       public String toString() {
  573           return  "MulticastDiscoveryAgent-"
  574               + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener);
  575       }
  576   }

Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » discovery » multicast » [javadoc | source]