1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.openejb.server; 18 19 import org.apache.openejb.loader.SystemInstance; 20 21 import java.net.URI; 22 import java.util.List; 23 import java.util.ArrayList; 24 import java.util.Collections; 25 import java.util.Map; 26 import java.util.Set; 27 import java.util.HashSet; 28 import java.util.concurrent.Executor; 29 import java.util.concurrent.ThreadPoolExecutor; 30 import java.util.concurrent.TimeUnit; 31 import java.util.concurrent.LinkedBlockingQueue; 32 import java.util.concurrent.ThreadFactory; 33 import java.util.concurrent.ConcurrentHashMap; 34 import java.io.IOException; 35 36 /** 37 * @version $Rev$ $Date$ 38 */ 39 public class DiscoveryRegistry implements DiscoveryListener, DiscoveryAgent { 40 41 private final List<DiscoveryAgent> agents = new ArrayList<DiscoveryAgent>(); 42 private final List<DiscoveryListener> listeners = new ArrayList<DiscoveryListener>(); 43 private final Map<String, URI> services = new ConcurrentHashMap<String, URI>(); 44 private final Map<String, URI> registered = new ConcurrentHashMap<String, URI>(); 45 46 private final Executor executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 47 public Thread newThread(Runnable runable) { 48 Thread t = new Thread(runable, DiscoveryRegistry.class.getSimpleName()); 49 t.setDaemon(true); 50 return t; 51 } 52 }); 53 54 public DiscoveryRegistry() { 55 SystemInstance.get().setComponent(DiscoveryRegistry.class, this); 56 SystemInstance.get().setComponent(DiscoveryAgent.class, this); 57 } 58 59 public DiscoveryRegistry(DiscoveryAgent agent) { 60 SystemInstance.get().setComponent(DiscoveryRegistry.class, this); 61 SystemInstance.get().setComponent(DiscoveryAgent.class, this); 62 addDiscoveryAgent(agent); 63 } 64 65 public void addDiscoveryAgent(DiscoveryAgent agent) { 66 agents.add(agent); 67 agent.setDiscoveryListener(this); 68 for (URI uri : registered.values()) { 69 try { 70 agent.registerService(uri); 71 } catch (IOException e) { 72 } 73 } 74 } 75 76 public Set<URI> getServices() { 77 return new HashSet<URI>(services.values()); 78 } 79 80 public void registerService(URI serviceUri) throws IOException { 81 registered.put(serviceUri.toString(), serviceUri); 82 for (DiscoveryAgent agent : agents) { 83 agent.registerService(serviceUri); 84 } 85 } 86 87 public void reportFailed(URI serviceUri) throws IOException { 88 registered.remove(serviceUri.toString()); 89 for (DiscoveryAgent agent : agents) { 90 agent.reportFailed(serviceUri); 91 } 92 } 93 94 public void unregisterService(URI serviceUri) throws IOException { 95 registered.remove(serviceUri.toString()); 96 for (DiscoveryAgent agent : agents) { 97 agent.unregisterService(serviceUri); 98 } 99 } 100 101 public void setDiscoveryListener(DiscoveryListener listener) { 102 addDiscoveryListener(listener); 103 } 104 105 public void addDiscoveryListener(DiscoveryListener listener){ 106 // get the listener caught up 107 for (URI service : services.values()) { 108 executor.execute(new ServiceAddedTask(listener, service)); 109 } 110 111 listeners.add(listener); 112 } 113 114 public void removeDiscoveryListener(DiscoveryListener listener){ 115 listeners.remove(listener); 116 } 117 118 119 public void serviceAdded(URI service) { 120 services.put(service.toString(), service); 121 for (final DiscoveryListener discoveryListener : getListeners()) { 122 executor.execute(new ServiceAddedTask(discoveryListener, service)); 123 } 124 } 125 126 public void serviceRemoved(URI service) { 127 128 for (final DiscoveryListener discoveryListener : getListeners()) { 129 executor.execute(new ServiceRemovedTask(discoveryListener, service)); 130 } 131 } 132 133 List<DiscoveryListener> getListeners(){ 134 return Collections.unmodifiableList(listeners); 135 } 136 137 private abstract static class Task implements Runnable { 138 protected final DiscoveryListener discoveryListener; 139 protected final URI service; 140 141 protected Task(DiscoveryListener discoveryListener, URI service) { 142 this.discoveryListener = discoveryListener; 143 this.service = service; 144 } 145 } 146 147 private static class ServiceRemovedTask extends Task { 148 public ServiceRemovedTask(DiscoveryListener discoveryListener, URI service) { 149 super(discoveryListener, service); 150 } 151 152 public void run() { 153 if (discoveryListener != null) { 154 discoveryListener.serviceRemoved(service); 155 } 156 } 157 } 158 159 private static class ServiceAddedTask extends Task { 160 public ServiceAddedTask(DiscoveryListener discoveryListener, URI service) { 161 super(discoveryListener, service); 162 } 163 164 public void run() { 165 if (discoveryListener != null) { 166 discoveryListener.serviceAdded(service); 167 } 168 } 169 } 170 }