Save This Page
Home » openejb-3.1.2-src » org.apache » openejb » core » stateful » [javadoc | source]
    1   /**
    2    *
    3    * Licensed to the Apache Software Foundation (ASF) under one or more
    4    * contributor license agreements.  See the NOTICE file distributed with
    5    * this work for additional information regarding copyright ownership.
    6    * The ASF licenses this file to You under the Apache License, Version 2.0
    7    * (the "License"); you may not use this file except in compliance with
    8    * the License.  You may obtain a copy of the License at
    9    *
   10    *     http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    *  Unless required by applicable law or agreed to in writing, software
   13    *  distributed under the License is distributed on an "AS IS" BASIS,
   14    *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   15    *  See the License for the specific language governing permissions and
   16    *  limitations under the License.
   17    */
   18   package org.apache.openejb.core.stateful;
   19   
   20   import java.util.ArrayList;
   21   import java.util.Iterator;
   22   import java.util.LinkedHashMap;
   23   import java.util.List;
   24   import java.util.Map;
   25   import java.util.Queue;
   26   import java.util.concurrent.ConcurrentHashMap;
   27   import java.util.concurrent.LinkedBlockingQueue;
   28   import java.util.concurrent.TimeUnit;
   29   import java.util.concurrent.locks.ReentrantLock;
   30   
   31   import org.apache.openejb.util.LogCategory;
   32   import org.apache.openejb.util.Logger;
   33   import org.apache.openejb.util.Duration;
   34   
   35   public class SimpleCache<K, V> implements Cache<K, V> {
   36       public static final Logger logger = Logger.getInstance(LogCategory.OPENEJB, "org.apache.openejb.util.resources");
   37   
   38       /**
   39        * Map of all known values by key
   40        */
   41       private final ConcurrentHashMap<K, Entry> cache = new ConcurrentHashMap<K, Entry>();
   42   
   43       /**
   44        * All values not in use in least resently used order
   45        */
   46       private final Queue<Entry> lru = new LinkedBlockingQueue<Entry>();
   47   
   48       /**
   49        * Notified when values are loaded, stored, or timedOut
   50        */
   51       private CacheListener<V> listener;
   52   
   53       /**
   54        * Used to load and store values
   55        */
   56       private PassivationStrategy passivator;
   57   
   58       /**
   59        * Maximum number of values that should be in the LRU
   60        */
   61       private int capacity;
   62   
   63       /**
   64        * When the LRU is exceeded, this is the is the number of beans stored.
   65        * This helps to avoid passivating a bean at a time.
   66        */
   67       private int bulkPassivate;
   68   
   69       /**
   70        * A bean may be destroyed if it isn't used in this length of time (in
   71        * milliseconds).
   72        */
   73       private long timeOut;
   74   
   75       public SimpleCache() {
   76       }
   77   
   78       public SimpleCache(CacheListener<V> listener, PassivationStrategy passivator, int capacity, int bulkPassivate, Duration timeOut) {
   79           this.listener = listener;
   80           this.passivator = passivator;
   81           this.capacity = capacity;
   82           this.bulkPassivate = bulkPassivate;
   83           this.timeOut = timeOut.getUnit().convert(timeOut.getTime(), TimeUnit.MILLISECONDS);
   84       }
   85   
   86       public synchronized CacheListener<V> getListener() {
   87           return listener;
   88       }
   89   
   90       public synchronized void setListener(CacheListener<V> listener) {
   91           this.listener = listener;
   92       }
   93   
   94       public synchronized PassivationStrategy getPassivator() {
   95           return passivator;
   96       }
   97   
   98       public synchronized void setPassivator(PassivationStrategy passivator) {
   99           this.passivator = passivator;
  100       }
  101   
  102       public synchronized void setPassivator(Class<? extends PassivationStrategy> passivatorClass) throws Exception {
  103           this.passivator = passivatorClass.newInstance();
  104       }
  105   
  106       public synchronized int getCapacity() {
  107           return capacity;
  108       }
  109   
  110       public synchronized void setCapacity(int capacity) {
  111           this.capacity = capacity;
  112       }
  113   
  114       // Old configurations use "PoolSize" to configure max cache size
  115       public synchronized void setPoolSize(int capacity) {
  116           this.capacity = capacity;
  117       }
  118   
  119       public synchronized int getBulkPassivate() {
  120           return bulkPassivate;
  121       }
  122   
  123       public synchronized void setBulkPassivate(int bulkPassivate) {
  124           this.bulkPassivate = bulkPassivate;
  125       }
  126   
  127       public synchronized long getTimeOut() {
  128           return timeOut;
  129       }
  130   
  131       public synchronized void setTimeOut(long timeOut) {
  132           this.timeOut = timeOut * 60 * 1000;
  133       }
  134   
  135       public void add(K key, V value) {
  136           // find the existing entry
  137           Entry entry = cache.get(key);
  138           if (entry != null) {
  139               entry.lock.lock();
  140               try {
  141                   if (entry.getState() != EntryState.REMOVED) {
  142                       throw new IllegalStateException("An entry for the key " + key + " already exists");
  143                   }
  144                   // Entry has been removed between get and lock, simply remove the garbage entry
  145                   cache.remove(key);
  146                   lru.remove(entry);
  147               } finally {
  148                   entry.lock.unlock();
  149               }
  150           }
  151   
  152           entry = new Entry(key, value, EntryState.CHECKED_OUT);
  153           cache.put(key, entry);
  154       }
  155   
  156       public V checkOut(K key) throws Exception {
  157           // attempt (up to 10 times) to obtain the entry from the cache
  158           for (int i = 0; i < 10; i++) {
  159               // find the entry
  160               Entry entry = cache.get(key);
  161               if (entry == null) {
  162                   entry = loadEntry(key);
  163                   if (entry == null) {
  164                       return null;
  165                   }
  166               }
  167   
  168               entry.lock.lock();
  169               try {
  170                   // verfiy state
  171                   switch (entry.getState()) {
  172                       case AVAILABLE:
  173                           break;
  174                       case CHECKED_OUT:
  175                           throw new IllegalStateException("The entry " + key + " is already checked-out");
  176                       case PASSIVATED:
  177                           // Entry was passivated between get and lock, we need to load the Entry again
  178                           // If the cache somehow got corrupted by an entry containing in state PASSIVATED, this remove
  179                           // call will remove the corruption
  180                           cache.remove(key, entry);
  181                           continue;
  182                       case REMOVED:
  183                           // Entry has been removed between get and lock (most likely by undeploying the EJB), simply drop the instance
  184                           return null;
  185                   }
  186   
  187                   // mark entry as in-use
  188                   entry.setState(EntryState.CHECKED_OUT);
  189   
  190                   // entry is removed from the lru while in use
  191                   lru.remove(entry);
  192   
  193                   return entry.getValue();
  194               } finally {
  195                   entry.lock.unlock();
  196               }
  197           }
  198   
  199           // something is really messed up with this entry, try to cleanup before throwing an exception
  200           Entry entry = cache.remove(key);
  201           if (entry != null) {
  202               lru.remove(entry);
  203           }
  204           throw new RuntimeException("Cache is corrupted: the entry " + key + " in the Map 'cache' is in state PASSIVATED");
  205       }
  206   
  207       public void checkIn(K key) {
  208           // find the entry
  209           Entry entry = cache.get(key);
  210           if (entry == null) {
  211               return;
  212           }
  213   
  214           entry.lock.lock();
  215           try {
  216               // verfiy state
  217               switch (entry.getState()) {
  218                   case AVAILABLE:
  219                       throw new IllegalStateException("The entry " + key + " is not checked-out");
  220                   case PASSIVATED:
  221                       // An entry in-use should not be passivated so we can only assume
  222                       // that the caller never checked out the bean in the first place
  223                       throw new IllegalStateException("The entry " + key + " is not checked-out");
  224                   case REMOVED:
  225                       // Entry has been removed between get and lock (most likely by undeploying the EJB), simply drop the instance
  226                       return;
  227               }
  228   
  229               // mark entry as available
  230               entry.setState(EntryState.AVAILABLE);
  231   
  232               // add entry to lru
  233               lru.add(entry);
  234               entry.resetTimeOut();
  235           } finally {
  236               entry.lock.unlock();
  237           }
  238   
  239           processLRU();
  240       }
  241   
  242       public V remove(K key) {
  243           // find the entry
  244           Entry entry = cache.get(key);
  245           if (entry == null) {
  246               return null;
  247           }
  248   
  249           entry.lock.lock();
  250           try {
  251               // remove the entry from the cache and lru
  252               cache.remove(key);
  253               lru.remove(entry);
  254   
  255               // There is no need to check the state because users of the cache
  256               // are responsible for maintaining references to beans in use
  257   
  258               // mark the entry as removed
  259               entry.setState(EntryState.REMOVED);
  260   
  261               return entry.getValue();
  262           } finally {
  263               entry.lock.unlock();
  264           }
  265       }
  266   
  267       public void removeAll(CacheFilter<V> filter) {
  268           for (Iterator<Entry> iterator = cache.values().iterator(); iterator.hasNext();) {
  269               Entry entry = iterator.next();
  270   
  271               entry.lock.lock();
  272               try {
  273                   if (filter.matches(entry.getValue())) {
  274                       // remove the entry from the cache and lru
  275                       iterator.remove();
  276                       lru.remove(entry);
  277   
  278                       // There is no need to check the state because users of the cache
  279                       // are responsible for maintaining references to beans in use
  280   
  281                       // mark the entry as removed
  282                       entry.setState(EntryState.REMOVED);
  283                   }
  284               } finally {
  285                   entry.lock.unlock();
  286               }
  287           }
  288       }
  289   
  290       public void processLRU() {
  291           CacheListener<V> listener = this.getListener();
  292   
  293           // check for timed out entries
  294           Iterator<Entry> iterator = lru.iterator();
  295           while (iterator.hasNext()) {
  296               Entry entry = iterator.next();
  297               entry.lock.lock();
  298               try {
  299                   switch (entry.getState()) {
  300                       case AVAILABLE:
  301                           break;
  302                       case CHECKED_OUT:
  303                           // bean is in use so cannot be passivated
  304                           continue;
  305                       case PASSIVATED:
  306                           // Entry was passivated between get and lock
  307                           iterator.remove();
  308                           continue;
  309                       case REMOVED:
  310                           // Entry was remmoved between get and lock
  311                           iterator.remove();
  312                           continue;
  313                   }
  314   
  315   
  316                   if (entry.isTimedOut()) {
  317                       iterator.remove();
  318                       cache.remove(entry.getKey());
  319                       entry.setState(EntryState.REMOVED);
  320   
  321                       // notify listener that the entry has been removed
  322                       if (listener != null) {
  323                           try {
  324                               listener.timedOut(entry.getValue());
  325                           } catch (Exception e) {
  326                               logger.error("An unexpected exception occured from timedOut callback", e);
  327                           }
  328                       }
  329                   } else {
  330                       // entries are in order of last updates, so if this bean isn't timed out
  331                       // no further entries will be timed out
  332                       break;
  333                   }
  334               } finally {
  335                   entry.lock.unlock();
  336               }
  337           }
  338   
  339           // if there are to many beans in the lru, shink is by on bulkPassivate size
  340           // bulkPassivate size is just an estimate, as locked or timed out beans are skipped
  341           if (lru.size() >= getCapacity()) {
  342               Map<K, V> valuesToStore = new LinkedHashMap<K, V>();
  343               List<Entry> entries = new ArrayList<Entry>();
  344   
  345               int bulkPassivate = getBulkPassivate();
  346               if (bulkPassivate < 1) bulkPassivate = 1;
  347               for (int i = 0; i < bulkPassivate; i++) {
  348                   Entry entry = lru.poll();
  349                   if (entry == null) {
  350                       // lru is empty
  351                       break;
  352                   }
  353   
  354                   if (!entry.lock.tryLock()) {
  355                       // If two threads are running in this method, you could get a deadlock
  356                       // due to lock acquisition order since this section gathers a group of
  357                       // locks. Simply skip beans we can not obtain a lock on
  358                       continue;
  359                   }
  360                   try {
  361                       switch (entry.getState()) {
  362                           case AVAILABLE:
  363                               break;
  364                           case CHECKED_OUT:
  365                               // bean is in use so cannot be passivated
  366                               continue;
  367                           case PASSIVATED:
  368                               // Entry was passivated between get and lock
  369                               lru.remove(entry);
  370                               continue;
  371                           case REMOVED:
  372                               // Entry was remmoved between get and lock
  373                               lru.remove(entry);
  374                               continue;
  375                       }
  376   
  377                       // remove it from the cache
  378                       cache.remove(entry.getKey());
  379   
  380                       // there is a race condition where the item could get added back into the lru
  381                       lru.remove(entry);
  382   
  383                       // if the entry is actually timed out we just destroy it; othewise it is written to disk
  384                       if (entry.isTimedOut()) {
  385                           entry.setState(EntryState.REMOVED);
  386                           if (listener != null) {
  387                               try {
  388                                   listener.timedOut(entry.getValue());
  389                               } catch (Exception e) {
  390                                   logger.error("An unexpected exception occured from timedOut callback", e);
  391                               }
  392                           }
  393                       } else {
  394                           // entry will be passivated, so we need to obtain an additional lock until the passivation is complete
  395                           entry.lock.lock();
  396                           entries.add(entry);
  397   
  398                           entry.setState(EntryState.PASSIVATED);
  399                           valuesToStore.put(entry.getKey(), entry.getValue());
  400                       }
  401                   } finally {
  402                       entry.lock.unlock();
  403                   }
  404               }
  405   
  406               if (!valuesToStore.isEmpty()) {
  407                   try {
  408                       storeEntries(valuesToStore);
  409                   } finally {
  410                       for (Entry entry : entries) {
  411                           // release the extra passivation lock
  412                           entry.lock.unlock();
  413                       }
  414                   }
  415               }
  416           }
  417       }
  418   
  419       private Entry loadEntry(K key) throws Exception {
  420           PassivationStrategy passivator = getPassivator();
  421           if (passivator == null) {
  422               return null;
  423           }
  424   
  425           V value = null;
  426           try {
  427               value = (V) passivator.activate(key);
  428           } catch (Exception e) {
  429               logger.error("An unexpected exception occured while reading entries from disk", e);
  430           }
  431   
  432           if (value == null) {
  433               return null;
  434           }
  435   
  436           CacheListener<V> listener = this.getListener();
  437           if (listener != null) {
  438               listener.afterLoad(value);
  439           }
  440           Entry entry = new Entry(key, value, EntryState.AVAILABLE);
  441           cache.put(key, entry);
  442           return entry;
  443       }
  444   
  445       private void storeEntries(Map<K, V> entriesToStore) {
  446           CacheListener<V> listener = this.getListener();
  447           for (Iterator<java.util.Map.Entry<K, V>> iterator = entriesToStore.entrySet().iterator(); iterator.hasNext();) {
  448               java.util.Map.Entry<K, V> entry = iterator.next();
  449   
  450               if (listener != null) {
  451                   try {
  452                       listener.beforeStore(entry.getValue());
  453                   } catch (Exception e) {
  454                       iterator.remove();
  455                       logger.error("An unexpected exception occured from beforeStore callback", e);
  456                   }
  457               }
  458   
  459           }
  460   
  461           PassivationStrategy passivator = getPassivator();
  462           if (passivator == null) {
  463               return;
  464           }
  465   
  466           try {
  467               passivator.passivate(entriesToStore);
  468           } catch (Exception e) {
  469               logger.error("An unexpected exception occured while writting the entries to disk", e);
  470           }
  471       }
  472   
  473       private enum EntryState {
  474           AVAILABLE, CHECKED_OUT, PASSIVATED, REMOVED
  475       }
  476   
  477       private class Entry {
  478           private final K key;
  479           private final V value;
  480           private final ReentrantLock lock = new ReentrantLock();
  481           private EntryState state;
  482           private long lastAccess;
  483   
  484           private Entry(K key, V value, EntryState state) {
  485               this.key = key;
  486               this.value = value;
  487               this.state = state;
  488               lastAccess = System.currentTimeMillis();
  489           }
  490   
  491           private K getKey() {
  492               assertLockHeld();
  493               return key;
  494           }
  495   
  496           private V getValue() {
  497               assertLockHeld();
  498               return value;
  499           }
  500   
  501           private EntryState getState() {
  502               assertLockHeld();
  503               return state;
  504           }
  505   
  506           private void setState(EntryState state) {
  507               assertLockHeld();
  508               this.state = state;
  509           }
  510   
  511           private boolean isTimedOut() {
  512               assertLockHeld();
  513   
  514               long timeOut = getTimeOut();
  515               if (timeOut == 0) {
  516                   return false;
  517               }
  518               long now = System.currentTimeMillis();
  519               return (now - lastAccess) > timeOut;
  520           }
  521   
  522           private void resetTimeOut() {
  523               assertLockHeld();
  524   
  525               if (getTimeOut() > 0) {
  526                   lastAccess = System.currentTimeMillis();
  527               }
  528           }
  529   
  530           private void assertLockHeld() {
  531               if (!lock.isHeldByCurrentThread()) {
  532                   throw new IllegalStateException("Entry must be locked");
  533               }
  534           }
  535       }
  536   }

Save This Page
Home » openejb-3.1.2-src » org.apache » openejb » core » stateful » [javadoc | source]