Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » policy » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region.policy;
   18   
   19   import java.util.ArrayList;
   20   import java.util.Collections;
   21   import java.util.Iterator;
   22   import java.util.LinkedList;
   23   import java.util.List;
   24   import org.apache.activemq.broker.ConnectionContext;
   25   import org.apache.activemq.broker.region.MessageReference;
   26   import org.apache.activemq.broker.region.SubscriptionRecovery;
   27   import org.apache.activemq.broker.region.Topic;
   28   import org.apache.activemq.command.ActiveMQDestination;
   29   import org.apache.activemq.command.Message;
   30   import org.apache.activemq.filter.DestinationFilter;
   31   import org.apache.activemq.filter.MessageEvaluationContext;
   32   import org.apache.activemq.thread.Scheduler;
   33   
   34   /**
   35    * This implementation of {@link SubscriptionRecoveryPolicy} will keep a timed
   36    * buffer of messages around in memory and use that to recover new
   37    * subscriptions.
   38    * 
   39    * @org.apache.xbean.XBean
   40    * @version $Revision$
   41    */
   42   public class TimedSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
   43   
   44       private static final int GC_INTERVAL = 1000;
   45       protected static final Scheduler scheduler = Scheduler.getInstance();
   46       
   47       // TODO: need to get a better synchronized linked list that has little
   48       // contention between enqueuing and dequeuing
   49       private final List<TimestampWrapper> buffer = Collections.synchronizedList(new LinkedList<TimestampWrapper>());
   50       private volatile long lastGCRun = System.currentTimeMillis();
   51   
   52       private long recoverDuration = 60 * 1000; // Buffer for 1 min.
   53   
   54       static class TimestampWrapper {
   55           public MessageReference message;
   56           public long timestamp;
   57   
   58           public TimestampWrapper(MessageReference message, long timestamp) {
   59               this.message = message;
   60               this.timestamp = timestamp;
   61           }
   62       }
   63   
   64       private final Runnable gcTask = new Runnable() {
   65           public void run() {
   66               gc();
   67           }
   68       };
   69   
   70       public SubscriptionRecoveryPolicy copy() {
   71           TimedSubscriptionRecoveryPolicy rc = new TimedSubscriptionRecoveryPolicy();
   72           rc.setRecoverDuration(recoverDuration);
   73           return rc;
   74       }
   75   
   76       public boolean add(ConnectionContext context, MessageReference message) throws Exception {
   77           buffer.add(new TimestampWrapper(message, lastGCRun));
   78           return true;
   79       }
   80   
   81       public void recover(ConnectionContext context, Topic topic, SubscriptionRecovery sub) throws Exception {
   82           // Re-dispatch the messages from the buffer.
   83           ArrayList<TimestampWrapper> copy = new ArrayList<TimestampWrapper>(buffer);
   84           if (!copy.isEmpty()) {
   85               for (Iterator<TimestampWrapper> iter = copy.iterator(); iter.hasNext();) {
   86                   TimestampWrapper timestampWrapper = iter.next();
   87                   MessageReference message = timestampWrapper.message;
   88                   sub.addRecoveredMessage(context, message);
   89               }
   90           }
   91       }
   92   
   93       public void start() throws Exception {
   94           scheduler.executePeriodically(gcTask, GC_INTERVAL);
   95       }
   96   
   97       public void stop() throws Exception {
   98           scheduler.cancel(gcTask);
   99       }
  100   
  101       public void gc() {
  102           lastGCRun = System.currentTimeMillis();
  103           while (buffer.size() > 0) {
  104               TimestampWrapper timestampWrapper = buffer.get(0);
  105               if (lastGCRun > timestampWrapper.timestamp + recoverDuration) {
  106                   // GC it.
  107                   buffer.remove(0);
  108               } else {
  109                   break;
  110               }
  111           }
  112       }
  113   
  114       public long getRecoverDuration() {
  115           return recoverDuration;
  116       }
  117   
  118       public void setRecoverDuration(long recoverDuration) {
  119           this.recoverDuration = recoverDuration;
  120       }
  121   
  122       public Message[] browse(ActiveMQDestination destination) throws Exception {
  123           List<Message> result = new ArrayList<Message>();
  124           ArrayList<TimestampWrapper> copy = new ArrayList<TimestampWrapper>(buffer);
  125           DestinationFilter filter = DestinationFilter.parseFilter(destination);
  126           for (Iterator<TimestampWrapper> iter = copy.iterator(); iter.hasNext();) {
  127               TimestampWrapper timestampWrapper = iter.next();
  128               MessageReference ref = timestampWrapper.message;
  129               Message message = ref.getMessage();
  130               if (filter.matches(message.getDestination())) {
  131                   result.add(message);
  132               }
  133           }
  134           return result.toArray(new Message[result.size()]);
  135       }
  136   
  137   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » policy » [javadoc | source]