Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » async » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.kaha.impl.async;
   18   
   19   import java.io.IOException;
   20   import java.util.ArrayList;
   21   import java.util.HashMap;
   22   import java.util.Iterator;
   23   import java.util.List;
   24   import java.util.Map;
   25   
   26   /**
   27    * Used to pool DataFileAccessors.
   28    * 
   29    * @author chirino
   30    */
   31   public class DataFileAccessorPool {
   32   
   33       private final AsyncDataManager dataManager;
   34       private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
   35       private boolean closed;
   36       private int maxOpenReadersPerFile = 5;
   37   
   38       class Pool {
   39   
   40           private final DataFile file;
   41           private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
   42           private boolean used;
   43           private int openCounter;
   44           private boolean disposed;
   45   
   46           public Pool(DataFile file) {
   47               this.file = file;
   48           }
   49   
   50           public DataFileAccessor openDataFileReader() throws IOException {
   51               DataFileAccessor rc = null;
   52               if (pool.isEmpty()) {
   53                   rc = new DataFileAccessor(dataManager, file);
   54               } else {
   55                   rc = (DataFileAccessor)pool.remove(pool.size() - 1);
   56               }
   57               used = true;
   58               openCounter++;
   59               return rc;
   60           }
   61   
   62           public synchronized void closeDataFileReader(DataFileAccessor reader) {
   63               openCounter--;
   64               if (pool.size() >= maxOpenReadersPerFile || disposed) {
   65                   reader.dispose();
   66               } else {
   67                   pool.add(reader);
   68               }
   69           }
   70   
   71           public synchronized void clearUsedMark() {
   72               used = false;
   73           }
   74   
   75           public synchronized boolean isUsed() {
   76               return used;
   77           }
   78   
   79           public synchronized void dispose() {
   80               for (DataFileAccessor reader : pool) {
   81                   reader.dispose();
   82               }
   83               pool.clear();
   84               disposed = true;
   85           }
   86   
   87           public synchronized int getOpenCounter() {
   88               return openCounter;
   89           }
   90   
   91       }
   92   
   93       public DataFileAccessorPool(AsyncDataManager dataManager) {
   94           this.dataManager = dataManager;
   95       }
   96   
   97       synchronized void clearUsedMark() {
   98           for (Iterator iter = pools.values().iterator(); iter.hasNext();) {
   99               Pool pool = (Pool)iter.next();
  100               pool.clearUsedMark();
  101           }
  102       }
  103   
  104       synchronized void disposeUnused() {
  105           for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
  106               Pool pool = iter.next();
  107               if (!pool.isUsed()) {
  108                   pool.dispose();
  109                   iter.remove();
  110               }
  111           }
  112       }
  113   
  114       synchronized void disposeDataFileAccessors(DataFile dataFile) {
  115           if (closed) {
  116               throw new IllegalStateException("Closed.");
  117           }
  118           Pool pool = pools.get(dataFile.getDataFileId());
  119           if (pool != null) {
  120               if (pool.getOpenCounter() == 0) {
  121                   pool.dispose();
  122                   pools.remove(dataFile.getDataFileId());
  123               } else {
  124                   throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter());
  125               }
  126           }
  127       }
  128   
  129       synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
  130           if (closed) {
  131               throw new IOException("Closed.");
  132           }
  133   
  134           Pool pool = pools.get(dataFile.getDataFileId());
  135           if (pool == null) {
  136               pool = new Pool(dataFile);
  137               pools.put(dataFile.getDataFileId(), pool);
  138           }
  139           return pool.openDataFileReader();
  140       }
  141   
  142       synchronized void closeDataFileAccessor(DataFileAccessor reader) {
  143           Pool pool = pools.get(reader.getDataFile().getDataFileId());
  144           if (pool == null || closed) {
  145               reader.dispose();
  146           } else {
  147               pool.closeDataFileReader(reader);
  148           }
  149       }
  150   
  151       public synchronized void close() {
  152           if (closed) {
  153               return;
  154           }
  155           closed = true;
  156           for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
  157               Pool pool = iter.next();
  158               pool.dispose();
  159           }
  160           pools.clear();
  161       }
  162   
  163   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » async » [javadoc | source]