Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » async » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.kaha.impl.async;
   18   
   19   import java.io.IOException;
   20   import java.io.RandomAccessFile;
   21   import java.nio.ByteBuffer;
   22   import java.nio.channels.FileChannel;
   23   
   24   /**
   25    * An AsyncDataFileAppender that uses NIO ByteBuffers and File chanels to more
   26    * efficently copy data to files.
   27    * 
   28    * @version $Revision: 1.1.1.1 $
   29    */
   30   class NIODataFileAppender extends DataFileAppender {
   31   
   32       public NIODataFileAppender(AsyncDataManager fileManager) {
   33           super(fileManager);
   34       }
   35   
   36       /**
   37        * The async processing loop that writes to the data files and does the
   38        * force calls.
   39        * 
   40        * Since the file sync() call is the slowest of all the operations, this
   41        * algorithm tries to 'batch' or group together several file sync() requests
   42        * into a single file sync() call. The batching is accomplished attaching
   43        * the same CountDownLatch instance to every force request in a group.
   44        * 
   45        */
   46       protected void processQueue() {
   47           DataFile dataFile = null;
   48           RandomAccessFile file = null;
   49           FileChannel channel = null;
   50           WriteBatch wb = null;
   51   
   52           try {
   53   
   54               ByteBuffer header = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_HEAD_SPACE);
   55               ByteBuffer footer = ByteBuffer.allocateDirect(AsyncDataManager.ITEM_FOOT_SPACE);
   56               ByteBuffer buffer = ByteBuffer.allocateDirect(maxWriteBatchSize);
   57   
   58               // Populate the static parts of the headers and footers..
   59               header.putInt(0); // size
   60               header.put((byte)0); // type
   61               header.put(RESERVED_SPACE); // reserved
   62               header.put(AsyncDataManager.ITEM_HEAD_SOR);
   63               footer.put(AsyncDataManager.ITEM_HEAD_EOR);
   64   
   65               while (true) {
   66   
   67                   Object o = null;
   68   
   69                   // Block till we get a command.
   70                   synchronized (enqueueMutex) {
   71                       while (true) {
   72                           if (nextWriteBatch != null) {
   73                               o = nextWriteBatch;
   74                               nextWriteBatch = null;
   75                               break;
   76                           }
   77                           if (shutdown) {
   78                               return;
   79                           }
   80                           enqueueMutex.wait();
   81                       }
   82                       enqueueMutex.notify();
   83                   }
   84   
   85                   wb = (WriteBatch)o;
   86                   if (dataFile != wb.dataFile) {
   87                       if (file != null) {
   88                           dataFile.closeRandomAccessFile(file);
   89                       }
   90                       dataFile = wb.dataFile;
   91                       file = dataFile.openRandomAccessFile(true);
   92                       channel = file.getChannel();
   93                   }
   94   
   95                   WriteCommand write = wb.first;
   96   
   97                   // Write all the data.
   98                   // Only need to seek to first location.. all others
   99                   // are in sequence.
  100                   file.seek(write.location.getOffset());
  101   
  102                   
  103                   boolean forceToDisk=false;
  104                   
  105                   // 
  106                   // is it just 1 big write?
  107                   if (wb.size == write.location.getSize()) {
  108                       forceToDisk = write.sync | write.onComplete!=null;
  109                       
  110                       header.clear();
  111                       header.putInt(write.location.getSize());
  112                       header.put(write.location.getType());
  113                       header.clear();
  114                       transfer(header, channel);
  115                       ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
  116                                                           write.data.getLength());
  117                       transfer(source, channel);
  118                       footer.clear();
  119                       transfer(footer, channel);
  120   
  121                   } else {
  122   
  123                       // Combine the smaller writes into 1 big buffer
  124                       while (write != null) {
  125                           forceToDisk |= write.sync | write.onComplete!=null;
  126                           
  127                           header.clear();
  128                           header.putInt(write.location.getSize());
  129                           header.put(write.location.getType());
  130                           header.clear();
  131                           copy(header, buffer);
  132                           assert !header.hasRemaining();
  133   
  134                           ByteBuffer source = ByteBuffer.wrap(write.data.getData(), write.data.getOffset(),
  135                                                               write.data.getLength());
  136                           copy(source, buffer);
  137                           assert !source.hasRemaining();
  138   
  139                           footer.clear();
  140                           copy(footer, buffer);
  141                           assert !footer.hasRemaining();
  142   
  143                           write = (WriteCommand)write.getNext();
  144                       }
  145   
  146                       // Fully write out the buffer..
  147                       buffer.flip();
  148                       transfer(buffer, channel);
  149                       buffer.clear();
  150                   }
  151   
  152                   if( forceToDisk ) {
  153                       file.getChannel().force(false);
  154                   }
  155   
  156                   WriteCommand lastWrite = (WriteCommand)wb.first.getTailNode();
  157                   dataManager.setLastAppendLocation(lastWrite.location);
  158   
  159                   // Now that the data is on disk, remove the writes from the in
  160                   // flight
  161                   // cache.
  162                   write = wb.first;
  163                   while (write != null) {
  164                       if (!write.sync) {
  165                           inflightWrites.remove(new WriteKey(write.location));
  166                       }
  167                       if (write.onComplete != null) {
  168   						try {
  169   							write.onComplete.run();
  170   						} catch (Throwable e) {
  171   							e.printStackTrace();
  172   						}
  173   					}
  174                       write = (WriteCommand)write.getNext();
  175                   }
  176                   
  177                   // Signal any waiting threads that the write is on disk.
  178                   wb.latch.countDown();
  179               }
  180   
  181           } catch (IOException e) {
  182               synchronized (enqueueMutex) {
  183                   firstAsyncException = e;
  184                   if (wb != null) {
  185                       wb.latch.countDown();
  186                       wb.exception.set(e);
  187                   }
  188                   if (nextWriteBatch != null) {
  189                       nextWriteBatch.latch.countDown();
  190                       nextWriteBatch.exception.set(e);
  191                   }
  192               }
  193           } catch (InterruptedException e) {
  194           } finally {
  195               try {
  196                   if (file != null) {
  197                       dataFile.closeRandomAccessFile(file);
  198                       dataFile = null;
  199                       file.close();
  200                       file = null;
  201                   }
  202                   if (channel != null) {
  203                       channel.close();
  204                       channel = null;
  205                   }
  206               } catch (IOException e) {
  207               }
  208               shutdownDone.countDown();
  209               running = false;
  210           }
  211       }
  212   
  213       /**
  214        * Copy the bytes in header to the channel.
  215        * 
  216        * @param header - source of data
  217        * @param channel - destination where the data will be written.
  218        * @throws IOException
  219        */
  220       private void transfer(ByteBuffer header, FileChannel channel) throws IOException {
  221           while (header.hasRemaining()) {
  222               channel.write(header);
  223           }
  224       }
  225   
  226       private int copy(ByteBuffer src, ByteBuffer dest) {
  227           int rc = Math.min(dest.remaining(), src.remaining());
  228           if (rc > 0) {
  229               // Adjust our limit so that we don't overflow the dest buffer.
  230               int limit = src.limit();
  231               src.limit(src.position() + rc);
  232               dest.put(src);
  233               // restore the limit.
  234               src.limit(limit);
  235           }
  236           return rc;
  237       }
  238   
  239   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » kaha » impl » async » [javadoc | source]