Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » async » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.kaha.impl.async;
   18   
   19   import java.io.IOException;
   20   import java.io.InterruptedIOException;
   21   import java.io.RandomAccessFile;
   22   import java.util.Map;
   23   import java.util.concurrent.CountDownLatch;
   24   import java.util.concurrent.atomic.AtomicReference;
   25   
   26   import org.apache.activemq.util.ByteSequence;
   27   import org.apache.activemq.util.DataByteArrayOutputStream;
   28   import org.apache.activemq.util.LinkedNode;
   29   
   30   /**
   31    * An optimized writer to do batch appends to a data file. This object is thread
   32    * safe and gains throughput as you increase the number of concurrent writes it
   33    * does.
   34    * 
   35    * @version $Revision: 1.1.1.1 $
   36    */
   37   class DataFileAppender {
   38   
   39       protected static final byte[] RESERVED_SPACE = new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE];
   40       protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
   41   
   42       protected final AsyncDataManager dataManager;
   43       protected final Map<WriteKey, WriteCommand> inflightWrites;
   44       protected final Object enqueueMutex = new Object(){};
   45       protected WriteBatch nextWriteBatch;
   46   
   47       protected boolean shutdown;
   48       protected IOException firstAsyncException;
   49       protected final CountDownLatch shutdownDone = new CountDownLatch(1);
   50       protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
   51   
   52       protected boolean running;
   53       private Thread thread;
   54   
   55       public static class WriteKey {
   56           private final int file;
   57           private final long offset;
   58           private final int hash;
   59   
   60           public WriteKey(Location item) {
   61               file = item.getDataFileId();
   62               offset = item.getOffset();
   63               // TODO: see if we can build a better hash
   64               hash = (int)(file ^ offset);
   65           }
   66   
   67           public int hashCode() {
   68               return hash;
   69           }
   70   
   71           public boolean equals(Object obj) {
   72               if (obj instanceof WriteKey) {
   73                   WriteKey di = (WriteKey)obj;
   74                   return di.file == file && di.offset == offset;
   75               }
   76               return false;
   77           }
   78       }
   79   
   80       public class WriteBatch {
   81   
   82           public final DataFile dataFile;
   83           public final WriteCommand first;
   84           public final CountDownLatch latch = new CountDownLatch(1);
   85           public int size;
   86           public AtomicReference<IOException> exception = new AtomicReference<IOException>();
   87   
   88           public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
   89               this.dataFile = dataFile;
   90               this.first = write;
   91               size += write.location.getSize();
   92           }
   93   
   94           public boolean canAppend(DataFile dataFile, WriteCommand write) {
   95               if (dataFile != this.dataFile) {
   96                   return false;
   97               }
   98               if (size + write.location.getSize() >= maxWriteBatchSize) {
   99                   return false;
  100               }
  101               return true;
  102           }
  103   
  104           public void append(WriteCommand write) throws IOException {
  105               this.first.getTailNode().linkAfter(write);
  106               size += write.location.getSize();
  107           }
  108       }
  109   
  110       public static class WriteCommand extends LinkedNode {
  111           public final Location location;
  112           public final ByteSequence data;
  113           final boolean sync;
  114           public final Runnable onComplete;
  115   
  116           public WriteCommand(Location location, ByteSequence data, boolean sync) {
  117               this.location = location;
  118               this.data = data;
  119               this.sync = sync;
  120               this.onComplete=null;
  121           }
  122   
  123           public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
  124               this.location = location;
  125               this.data = data;
  126   			this.onComplete = onComplete;
  127               this.sync = false;
  128   		}
  129       }
  130   
  131   
  132       /**
  133        * Construct a Store writer
  134        * 
  135        * @param fileId
  136        */
  137       public DataFileAppender(AsyncDataManager dataManager) {
  138           this.dataManager = dataManager;
  139           this.inflightWrites = this.dataManager.getInflightWrites();
  140       }
  141   
  142       /**
  143        * @param type
  144        * @param marshaller
  145        * @param payload
  146        * @param type
  147        * @param sync
  148        * @return
  149        * @throws IOException
  150        * @throws
  151        * @throws
  152        */
  153       public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException {
  154   
  155           // Write the packet our internal buffer.
  156           int size = data.getLength() + AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
  157   
  158           final Location location = new Location();
  159           location.setSize(size);
  160           location.setType(type);
  161   
  162           WriteBatch batch;
  163           WriteCommand write = new WriteCommand(location, data, sync);
  164   
  165           // Locate datafile and enqueue into the executor in sychronized block so
  166           // that writes get equeued onto the executor in order that they were assigned
  167           // by the data manager (which is basically just appending)
  168   
  169           synchronized (this) {
  170               // Find the position where this item will land at.
  171               DataFile dataFile = dataManager.allocateLocation(location);
  172               if( !sync ) {
  173                   inflightWrites.put(new WriteKey(location), write);
  174               }
  175               batch = enqueue(dataFile, write);
  176           }
  177           location.setLatch(batch.latch);
  178           if (sync) {
  179               try {
  180                   batch.latch.await();
  181               } catch (InterruptedException e) {
  182                   throw new InterruptedIOException();
  183               }
  184               IOException exception = batch.exception.get(); 
  185               if (exception != null) {
  186                   throw exception;
  187               }
  188           }
  189   
  190           return location;
  191       }
  192       
  193   	public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException {
  194           // Write the packet our internal buffer.
  195           int size = data.getLength() + AsyncDataManager.ITEM_HEAD_FOOT_SPACE;
  196   
  197           final Location location = new Location();
  198           location.setSize(size);
  199           location.setType(type);
  200   
  201           WriteBatch batch;
  202           WriteCommand write = new WriteCommand(location, data, onComplete);
  203   
  204           // Locate datafile and enqueue into the executor in sychronized block so
  205           // that writes get equeued onto the executor in order that they were assigned
  206           // by the data manager (which is basically just appending)
  207   
  208           synchronized (this) {
  209               // Find the position where this item will land at.
  210               DataFile dataFile = dataManager.allocateLocation(location);
  211               inflightWrites.put(new WriteKey(location), write);
  212               batch = enqueue(dataFile, write);
  213           }
  214           location.setLatch(batch.latch);
  215   
  216           return location;
  217   	}
  218   
  219       private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException {
  220           synchronized (enqueueMutex) {
  221               WriteBatch rc = null;
  222               if (shutdown) {
  223                   throw new IOException("Async Writter Thread Shutdown");
  224               }
  225               
  226               if (!running) {
  227                   running = true;
  228                   thread = new Thread() {
  229                       public void run() {
  230                           processQueue();
  231                       }
  232                   };
  233                   thread.setPriority(Thread.MAX_PRIORITY);
  234                   thread.setDaemon(true);
  235                   thread.setName("ActiveMQ Data File Writer");
  236                   thread.start();
  237                   firstAsyncException = null;
  238               }
  239               
  240               if (firstAsyncException != null) {
  241                   throw firstAsyncException;
  242               }
  243   
  244               if (nextWriteBatch == null) {
  245                   nextWriteBatch = new WriteBatch(dataFile, write);
  246                   rc = nextWriteBatch;
  247                   enqueueMutex.notify();
  248               } else {
  249                   // Append to current batch if possible..
  250                   if (nextWriteBatch.canAppend(dataFile, write)) {
  251                       nextWriteBatch.append(write);
  252                       rc = nextWriteBatch;
  253                   } else {
  254                       // Otherwise wait for the queuedCommand to be null
  255                       try {
  256                           while (nextWriteBatch != null) {
  257                               enqueueMutex.wait();
  258                           }
  259                       } catch (InterruptedException e) {
  260                           throw new InterruptedIOException();
  261                       }
  262                       if (shutdown) {
  263                           throw new IOException("Async Writter Thread Shutdown");
  264                       }
  265   
  266                       // Start a new batch.
  267                       nextWriteBatch = new WriteBatch(dataFile, write);
  268                       rc = nextWriteBatch;
  269                       enqueueMutex.notify();
  270                   }
  271               }
  272               return rc;
  273           }
  274       }
  275   
  276       public void close() throws IOException {
  277           synchronized (enqueueMutex) {
  278               if (!shutdown) {
  279                   shutdown = true;
  280                   if (running) {
  281                       enqueueMutex.notifyAll();
  282                   } else {
  283                       shutdownDone.countDown();
  284                   }
  285               }
  286           }
  287   
  288           try {
  289               shutdownDone.await();
  290           } catch (InterruptedException e) {
  291               throw new InterruptedIOException();
  292           }
  293   
  294       }
  295   
  296       /**
  297        * The async processing loop that writes to the data files and does the
  298        * force calls.
  299        * 
  300        * Since the file sync() call is the slowest of all the operations, this
  301        * algorithm tries to 'batch' or group together several file sync() requests
  302        * into a single file sync() call. The batching is accomplished attaching
  303        * the same CountDownLatch instance to every force request in a group.
  304        * 
  305        */
  306       protected void processQueue() {
  307           DataFile dataFile = null;
  308           RandomAccessFile file = null;
  309           WriteBatch wb = null;
  310           try {
  311   
  312               DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
  313               while (true) {
  314   
  315                   Object o = null;
  316   
  317                   // Block till we get a command.
  318                   synchronized (enqueueMutex) {
  319                       while (true) {
  320                           if (nextWriteBatch != null) {
  321                               o = nextWriteBatch;
  322                               nextWriteBatch = null;
  323                               break;
  324                           }
  325                           if (shutdown) {
  326                               return;
  327                           }
  328                           enqueueMutex.wait();
  329                       }
  330                       enqueueMutex.notify();
  331                   }
  332   
  333                   wb = (WriteBatch)o;
  334                   if (dataFile != wb.dataFile) {
  335                       if (file != null) {
  336                           dataFile.closeRandomAccessFile(file);
  337                       }
  338                       dataFile = wb.dataFile;
  339                       file = dataFile.openRandomAccessFile(true);
  340                   }
  341   
  342                   WriteCommand write = wb.first;
  343   
  344                   // Write all the data.
  345                   // Only need to seek to first location.. all others
  346                   // are in sequence.
  347                   file.seek(write.location.getOffset());
  348   
  349                   
  350                   boolean forceToDisk=false;
  351                   
  352                   // 
  353                   // is it just 1 big write?
  354                   if (wb.size == write.location.getSize()) {
  355                       forceToDisk = write.sync | write.onComplete!=null;
  356                       
  357                       // Just write it directly..
  358                       file.writeInt(write.location.getSize());
  359                       file.writeByte(write.location.getType());
  360                       file.write(RESERVED_SPACE);
  361                       file.write(AsyncDataManager.ITEM_HEAD_SOR);
  362                       file.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
  363                       file.write(AsyncDataManager.ITEM_HEAD_EOR);
  364   
  365                   } else {
  366   
  367                       // Combine the smaller writes into 1 big buffer
  368                       while (write != null) {
  369                           forceToDisk |= write.sync | write.onComplete!=null;
  370   
  371                           buff.writeInt(write.location.getSize());
  372                           buff.writeByte(write.location.getType());
  373                           buff.write(RESERVED_SPACE);
  374                           buff.write(AsyncDataManager.ITEM_HEAD_SOR);
  375                           buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
  376                           buff.write(AsyncDataManager.ITEM_HEAD_EOR);
  377   
  378                           write = (WriteCommand)write.getNext();
  379                       }
  380   
  381                       // Now do the 1 big write.
  382                       ByteSequence sequence = buff.toByteSequence();
  383                       file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
  384                       buff.reset();
  385                   }
  386   
  387                   if( forceToDisk ) {
  388                       file.getFD().sync();
  389                   }
  390                   
  391                   WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
  392                   dataManager.setLastAppendLocation(lastWrite.location);
  393   
  394                   // Now that the data is on disk, remove the writes from the in
  395                   // flight
  396                   // cache.
  397                   write = wb.first;
  398                   while (write != null) {
  399                       if (!write.sync) {
  400                           inflightWrites.remove(new WriteKey(write.location));
  401                       }
  402                       if( write.onComplete !=null ) {
  403                       	 try {
  404   							write.onComplete.run();
  405   						} catch (Throwable e) {
  406   							e.printStackTrace();
  407   						}
  408                       }
  409                       write = (WriteCommand)write.getNext();
  410                   }
  411                   
  412                   // Signal any waiting threads that the write is on disk.
  413                   wb.latch.countDown();
  414               }
  415           } catch (IOException e) {
  416               synchronized (enqueueMutex) {
  417                   firstAsyncException = e;
  418                   if (wb != null) {
  419                       wb.latch.countDown();
  420                       wb.exception.set(e);
  421                   }
  422                   if (nextWriteBatch != null) {
  423                       nextWriteBatch.latch.countDown();
  424                       nextWriteBatch.exception.set(e);
  425                   }
  426               }
  427           } catch (InterruptedException e) {
  428           } finally {
  429               try {
  430                   if (file != null) {
  431                       dataFile.closeRandomAccessFile(file);
  432                   }
  433               } catch (Throwable ignore) {
  434               }
  435               shutdownDone.countDown();
  436           }
  437       }
  438   
  439   
  440   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » async » [javadoc | source]