Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » async » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.kaha.impl.async;
   18   
   19   import java.io.IOException;
   20   import java.io.RandomAccessFile;
   21   import java.util.Map;
   22   
   23   import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
   24   import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
   25   import org.apache.activemq.util.ByteSequence;
   26   
   27   /**
   28    * Optimized Store reader and updater. Single threaded and synchronous. Use in
   29    * conjunction with the DataFileAccessorPool of concurrent use.
   30    * 
   31    * @version $Revision: 1.1.1.1 $
   32    */
   33   final class DataFileAccessor {
   34   
   35       private final DataFile dataFile;
   36       private final Map<WriteKey, WriteCommand> inflightWrites;
   37       private final RandomAccessFile file;
   38       private boolean disposed;
   39   
   40       /**
   41        * Construct a Store reader
   42        * 
   43        * @param fileId
   44        * @throws IOException
   45        */
   46       public DataFileAccessor(AsyncDataManager dataManager, DataFile dataFile) throws IOException {
   47           this.dataFile = dataFile;
   48           this.inflightWrites = dataManager.getInflightWrites();
   49           this.file = dataFile.openRandomAccessFile(false);
   50       }
   51   
   52       public DataFile getDataFile() {
   53           return dataFile;
   54       }
   55   
   56       public void dispose() {
   57           if (disposed) {
   58               return;
   59           }
   60           disposed = true;
   61           try {
   62               dataFile.closeRandomAccessFile(file);
   63           } catch (IOException e) {
   64               e.printStackTrace();
   65           }
   66       }
   67   
   68       public ByteSequence readRecord(Location location) throws IOException {
   69   
   70           if (!location.isValid()) {
   71               throw new IOException("Invalid location: " + location);
   72           }
   73   
   74           WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
   75           if (asyncWrite != null) {
   76               return asyncWrite.data;
   77           }
   78   
   79           try {
   80   
   81               if (location.getSize() == Location.NOT_SET) {
   82                   file.seek(location.getOffset());
   83                   location.setSize(file.readInt());
   84                   file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
   85               } else {
   86                   file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
   87               }
   88   
   89               byte[] data = new byte[location.getSize() - AsyncDataManager.ITEM_HEAD_FOOT_SPACE];
   90               file.readFully(data);
   91               return new ByteSequence(data, 0, data.length);
   92   
   93           } catch (RuntimeException e) {
   94               throw new IOException("Invalid location: " + location + ", : " + e);
   95           }
   96       }
   97   
   98       public void readLocationDetails(Location location) throws IOException {
   99           WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
  100           if (asyncWrite != null) {
  101               location.setSize(asyncWrite.location.getSize());
  102               location.setType(asyncWrite.location.getType());
  103           } else {
  104               file.seek(location.getOffset());
  105               location.setSize(file.readInt());
  106               location.setType(file.readByte());
  107           }
  108       }
  109   
  110       public boolean readLocationDetailsAndValidate(Location location) {
  111           try {
  112               WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
  113               if (asyncWrite != null) {
  114                   location.setSize(asyncWrite.location.getSize());
  115                   location.setType(asyncWrite.location.getType());
  116               } else {
  117                   file.seek(location.getOffset());
  118                   location.setSize(file.readInt());
  119                   location.setType(file.readByte());
  120   
  121                   byte data[] = new byte[3];
  122                   file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_OFFSET_TO_SOR);
  123                   file.readFully(data);
  124                   if (data[0] != AsyncDataManager.ITEM_HEAD_SOR[0]
  125                       || data[1] != AsyncDataManager.ITEM_HEAD_SOR[1]
  126                       || data[2] != AsyncDataManager.ITEM_HEAD_SOR[2]) {
  127                       return false;
  128                   }
  129                   file.seek(location.getOffset() + location.getSize() - AsyncDataManager.ITEM_FOOT_SPACE);
  130                   file.readFully(data);
  131                   if (data[0] != AsyncDataManager.ITEM_HEAD_EOR[0]
  132                       || data[1] != AsyncDataManager.ITEM_HEAD_EOR[1]
  133                       || data[2] != AsyncDataManager.ITEM_HEAD_EOR[2]) {
  134                       return false;
  135                   }
  136               }
  137           } catch (IOException e) {
  138               return false;
  139           }
  140           return true;
  141       }
  142   
  143       public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException {
  144   
  145           file.seek(location.getOffset() + AsyncDataManager.ITEM_HEAD_SPACE);
  146           int size = Math.min(data.getLength(), location.getSize());
  147           file.write(data.getData(), data.getOffset(), size);
  148           if (sync) {
  149               file.getFD().sync();
  150           }
  151   
  152       }
  153   
  154   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » async » [javadoc | source]