Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq;
   18   
   19   import javax.jms.JMSException;
   20   import javax.jms.Message;
   21   
   22   import org.apache.activemq.broker.region.MessageReference;
   23   import org.apache.activemq.command.MessageId;
   24   import org.apache.activemq.command.ProducerId;
   25   import org.apache.activemq.util.BitArrayBin;
   26   import org.apache.activemq.util.IdGenerator;
   27   import org.apache.activemq.util.LRUCache;
   28   
   29   /**
   30    * Provides basic audit functions for Messages
   31    * 
   32    * @version $Revision: 1.1.1.1 $
   33    */
   34   public class ActiveMQMessageAudit {
   35   
   36       public static final int DEFAULT_WINDOW_SIZE = 2048;
   37       public static final int MAXIMUM_PRODUCER_COUNT = 64;
   38       private int auditDepth;
   39       private int maximumNumberOfProducersToTrack;
   40       private LRUCache<Object, BitArrayBin> map;
   41   
   42       /**
   43        * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
   44        * 64
   45        */
   46       public ActiveMQMessageAudit() {
   47           this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
   48       }
   49   
   50       /**
   51        * Construct a MessageAudit
   52        * 
   53        * @param auditDepth range of ids to track
   54        * @param maximumNumberOfProducersToTrack number of producers expected in
   55        *                the system
   56        */
   57       public ActiveMQMessageAudit(int auditDepth, final int maximumNumberOfProducersToTrack) {
   58           this.auditDepth = auditDepth;
   59           this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
   60           this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
   61       }
   62       
   63       /**
   64        * @return the auditDepth
   65        */
   66       public int getAuditDepth() {
   67           return auditDepth;
   68       }
   69   
   70       /**
   71        * @param auditDepth the auditDepth to set
   72        */
   73       public void setAuditDepth(int auditDepth) {
   74           this.auditDepth = auditDepth;
   75       }
   76   
   77       /**
   78        * @return the maximumNumberOfProducersToTrack
   79        */
   80       public int getMaximumNumberOfProducersToTrack() {
   81           return maximumNumberOfProducersToTrack;
   82       }
   83   
   84       /**
   85        * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
   86        */
   87       public void setMaximumNumberOfProducersToTrack(
   88               int maximumNumberOfProducersToTrack) {
   89           this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
   90           this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
   91       }
   92   
   93       /**
   94        * Checks if this message has been seen before
   95        * 
   96        * @param message
   97        * @return true if the message is a duplicate
   98        * @throws JMSException
   99        */
  100       public boolean isDuplicate(Message message) throws JMSException {
  101           return isDuplicate(message.getJMSMessageID());
  102       }
  103   
  104       /**
  105        * checks whether this messageId has been seen before and adds this
  106        * messageId to the list
  107        * 
  108        * @param id
  109        * @return true if the message is a duplicate
  110        */
  111       public synchronized boolean isDuplicate(String id) {
  112           boolean answer = false;
  113           String seed = IdGenerator.getSeedFromId(id);
  114           if (seed != null) {
  115               BitArrayBin bab = map.get(seed);
  116               if (bab == null) {
  117                   bab = new BitArrayBin(auditDepth);
  118                   map.put(seed, bab);
  119               }
  120               long index = IdGenerator.getSequenceFromId(id);
  121               if (index >= 0) {
  122                   answer = bab.setBit(index, true);
  123               }
  124           }
  125           return answer;
  126       }
  127   
  128       /**
  129        * Checks if this message has been seen before
  130        * 
  131        * @param message
  132        * @return true if the message is a duplicate
  133        */
  134       public boolean isDuplicate(final MessageReference message) {
  135           MessageId id = message.getMessageId();
  136           return isDuplicate(id);
  137       }
  138       
  139       /**
  140        * Checks if this messageId has been seen before
  141        * 
  142        * @param id
  143        * @return true if the message is a duplicate
  144        */
  145       public synchronized boolean isDuplicate(final MessageId id) {
  146           boolean answer = false;
  147           
  148           if (id != null) {
  149               ProducerId pid = id.getProducerId();
  150               if (pid != null) {
  151                   BitArrayBin bab = map.get(pid);
  152                   if (bab == null) {
  153                       bab = new BitArrayBin(auditDepth);
  154                       map.put(pid, bab);
  155                   }
  156                   answer = bab.setBit(id.getProducerSequenceId(), true);
  157               }
  158           }
  159           return answer;
  160       }
  161   
  162       /**
  163        * mark this message as being received
  164        * 
  165        * @param message
  166        */
  167       public void rollback(final MessageReference message) {
  168           MessageId id = message.getMessageId();
  169           rollback(id);
  170       }
  171       
  172       /**
  173        * mark this message as being received
  174        * 
  175        * @param id
  176        */
  177       public synchronized void rollback(final  MessageId id) {
  178           if (id != null) {
  179               ProducerId pid = id.getProducerId();
  180               if (pid != null) {
  181                   BitArrayBin bab = map.get(pid);
  182                   if (bab != null) {
  183                       bab.setBit(id.getProducerSequenceId(), false);
  184                   }
  185               }
  186           }
  187       }
  188       
  189       /**
  190        * Check the message is in order
  191        * @param msg
  192        * @return
  193        * @throws JMSException
  194        */
  195       public boolean isInOrder(Message msg) throws JMSException {
  196           return isInOrder(msg.getJMSMessageID());
  197       }
  198       
  199       /**
  200        * Check the message id is in order
  201        * @param id
  202        * @return
  203        */
  204       public synchronized boolean isInOrder(final String id) {
  205           boolean answer = true;
  206           
  207           if (id != null) {
  208               String seed = IdGenerator.getSeedFromId(id);
  209               if (seed != null) {
  210                   BitArrayBin bab = map.get(seed);
  211                   if (bab != null) {
  212                       long index = IdGenerator.getSequenceFromId(id);
  213                       answer = bab.isInOrder(index);
  214                   }
  215                  
  216               }
  217           }
  218           return answer;
  219       }
  220       
  221       /**
  222        * Check the MessageId is in order
  223        * @param message 
  224        * @return
  225        */
  226       public synchronized boolean isInOrder(final MessageReference message) {
  227           return isInOrder(message.getMessageId());
  228       }
  229       
  230       /**
  231        * Check the MessageId is in order
  232        * @param id
  233        * @return
  234        */
  235       public synchronized boolean isInOrder(final MessageId id) {
  236           boolean answer = false;
  237   
  238           if (id != null) {
  239               ProducerId pid = id.getProducerId();
  240               if (pid != null) {
  241                   BitArrayBin bab = map.get(pid);
  242                   if (bab == null) {
  243                       bab = new BitArrayBin(auditDepth);
  244                       map.put(pid, bab);
  245                   }
  246                   answer = bab.isInOrder(id.getProducerSequenceId());
  247   
  248               }
  249           }
  250           return answer;
  251       }
  252   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]