Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » async » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.kaha.impl.async;
   18   
   19   import java.io.File;
   20   import java.io.IOException;
   21   import java.io.RandomAccessFile;
   22   import java.nio.channels.FileLock;
   23   import java.nio.channels.OverlappingFileLockException;
   24   
   25   import org.apache.activemq.util.ByteSequence;
   26   import org.apache.activemq.util.IOExceptionSupport;
   27   
   28   /**
   29    * Use to reliably store fixed sized state data. It stores the state in record
   30    * that is versioned and repeated twice in the file so that a failure in the
   31    * middle of the write of the first or second record do not not result in an
   32    * unknown state.
   33    * 
   34    * @version $Revision: 1.1 $
   35    */
   36   public final class ControlFile {
   37   
   38       private static final boolean DISABLE_FILE_LOCK = "true".equals(System.getProperty("java.nio.channels.FileLock.broken", "false"));
   39       private final File file;
   40   
   41       /** The File that holds the control data. */
   42       private final RandomAccessFile randomAccessFile;
   43       private final int maxRecordSize;
   44       private final int firstRecordStart;
   45       private final int secondRecordStart;
   46       private final int firstRecordEnd;
   47       private final int secondRecordEnd;
   48   
   49       private long version;
   50       private FileLock lock;
   51       private boolean disposed;
   52   
   53       public ControlFile(File file, int recordSize) throws IOException {
   54           this.file = file;
   55           this.maxRecordSize = recordSize + 4;
   56           
   57           // Calculate where the records start and end.
   58           this.firstRecordStart = 8;
   59           this.secondRecordStart = 8 + maxRecordSize + 8 + 8;
   60           this.firstRecordEnd = firstRecordStart+maxRecordSize;
   61           this.secondRecordEnd = secondRecordStart+maxRecordSize;
   62   
   63           randomAccessFile = new RandomAccessFile(file, "rw");
   64       }
   65   
   66       /**
   67        * Locks the control file.
   68        * 
   69        * @throws IOException
   70        */
   71       public void lock() throws IOException {
   72           if (DISABLE_FILE_LOCK) {
   73               return;
   74           }
   75   
   76           if (lock == null) {
   77               try {
   78                   lock = randomAccessFile.getChannel().tryLock();
   79               } catch (OverlappingFileLockException e) {
   80                   throw IOExceptionSupport.create("Control file '" + file + "' could not be locked.",e);
   81               }
   82               if (lock == null) {
   83                   throw new IOException("Control file '" + file + "' could not be locked.");
   84               }
   85           }
   86       }
   87   
   88       /**
   89        * Un locks the control file.
   90        * 
   91        * @throws IOException
   92        */
   93       public void unlock() throws IOException {
   94           if (DISABLE_FILE_LOCK) {
   95               return;
   96           }
   97   
   98           if (lock != null) {
   99               lock.release();
  100               lock = null;
  101           }
  102       }
  103   
  104       public void dispose() {
  105           if (disposed) {
  106               return;
  107           }
  108           disposed = true;
  109           try {
  110               unlock();
  111           } catch (IOException ignore) {
  112           }
  113           try {
  114               randomAccessFile.close();
  115           } catch (IOException ignore) {
  116           }
  117       }
  118   
  119       public synchronized ByteSequence load() throws IOException {
  120           long l = randomAccessFile.length();
  121           if (l < maxRecordSize) {
  122               return null;
  123           }
  124   
  125           randomAccessFile.seek(firstRecordStart-8);
  126           long v1 = randomAccessFile.readLong();
  127           randomAccessFile.seek(firstRecordEnd);
  128           long v1check = randomAccessFile.readLong();
  129   
  130           randomAccessFile.seek(secondRecordStart - 8);
  131           long v2 = randomAccessFile.readLong();
  132           randomAccessFile.seek(secondRecordEnd);
  133           long v2check = randomAccessFile.readLong();
  134   
  135           byte[] data = null;
  136           if (v2 == v2check) {
  137               version = v2;
  138               randomAccessFile.seek(secondRecordStart);
  139               int size = randomAccessFile.readInt();
  140               data = new byte[size];
  141               randomAccessFile.readFully(data);
  142           } else if (v1 == v1check) {
  143               version = v1;
  144               randomAccessFile.seek(firstRecordStart);
  145               int size = randomAccessFile.readInt();
  146               data = new byte[size];
  147               randomAccessFile.readFully(data);
  148           } else {
  149               // Bummer.. Both checks are screwed. we don't know
  150               // if any of the two buffer are ok. This should
  151               // only happen is data got corrupted.
  152               throw new IOException("Control data corrupted.");
  153           }
  154           return new ByteSequence(data, 0, data.length);
  155       }
  156   
  157       public void store(ByteSequence data, boolean sync) throws IOException {
  158   
  159           version++;
  160           randomAccessFile.setLength((maxRecordSize * 2) + 32);
  161           randomAccessFile.seek(0);
  162   
  163           // Write the first copy of the control data.
  164           randomAccessFile.writeLong(version);
  165           randomAccessFile.writeInt(data.getLength());
  166           randomAccessFile.write(data.getData());
  167           randomAccessFile.seek(firstRecordEnd);
  168           randomAccessFile.writeLong(version);
  169   
  170           // Write the second copy of the control data.
  171           randomAccessFile.writeLong(version);
  172           randomAccessFile.writeInt(data.getLength());
  173           randomAccessFile.write(data.getData());
  174           randomAccessFile.seek(secondRecordEnd);
  175           randomAccessFile.writeLong(version);
  176   
  177           if (sync) {
  178               randomAccessFile.getFD().sync();
  179           }
  180       }
  181   
  182   	public boolean isDisposed() {
  183   		return disposed;
  184   	}
  185   
  186   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » async » [javadoc | source]