org.apache.catalina.cluster.util
public class: SmartQueue [javadoc |
source]
java.lang.Object
org.apache.catalina.cluster.util.SmartQueue
A smart queue, used for async replication
the "smart" part of this queue is that if the session is already queued for
replication, and it is updated again, the session will simply be replaced,
hence we don't replicate stuff that is obsolete. Put this into util, since it
is quite generic.
- author:
Filip
- Hanik
- version:
1.0
-
Field Summary |
---|
public static Log | log | |
Methods from java.lang.Object: |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Method from org.apache.catalina.cluster.util.SmartQueue Detail: |
public void add(SmartEntry entry) {
/*
* make sure we are within a synchronized block since we are dealing
* with two unsync collections
*/
synchronized (mutex) {
/* check to see if this object has already been queued */
SmartEntry current = (SmartEntry) queueMap.get(entry.getKey());
if (current == null) {
/* the object has not been queued, at it to the end of the queue */
if (log.isDebugEnabled())
log.debug("[" + Thread.currentThread().getName()
+ "][SmartQueue] Adding new object=" + entry);
queue.addLast(entry);
queueMap.put(entry.getKey(), entry);
} else {
/* the object has been queued, replace the value */
if (log.isDebugEnabled())
log.debug("[" + Thread.currentThread().getName()
+ "][SmartQueue] Replacing old object=" + current);
current.setValue(entry.getValue());
if (log.isDebugEnabled())
log.debug("with new object=" + current);
}
/*
* wake up all the threads that are waiting for the lock to be
* released
*/
mutex.notifyAll();
}
}
Add an object to the queue |
public SmartEntry remove() {
return remove(0);
}
Blocks forever until an element has been added to the queue |
public SmartEntry remove(long timeout) {
SmartEntry result = null;
long startEntry = System.currentTimeMillis();
synchronized (mutex) {
while (size() == 0) {
try {
if (log.isDebugEnabled())
log
.debug("["
+ Thread.currentThread().getName()
+ "][SmartQueue] Queue sleeping until object added size="
+ size() + ".");
if ((timeout != 0)
&& ((System.currentTimeMillis() - startEntry) > timeout)) {
return null;
}
mutex.wait(timeout);
if (log.isDebugEnabled())
log
.debug("["
+ Thread.currentThread().getName()
+ "][SmartQueue] Queue woke up or interrupted size="
+ size() + ".");
} catch (IllegalMonitorStateException ex) {
throw ex;
} catch (InterruptedException ex) {
}//catch
}//while
/* guaranteed that we are not empty by now */
result = (SmartEntry) queue.removeFirst();
queueMap.remove(result.getKey());
if (log.isDebugEnabled())
log.debug("[" + Thread.currentThread().getName()
+ "][SmartQueue] Returning=" + result);
}
return result;
}
|
public int size() {
synchronized (mutex) {
return queue.size();
}
}
|