1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.activemq; 18 19 import javax.jms.JMSException; 20 import javax.jms.Message; 21 22 import org.apache.activemq.broker.region.MessageReference; 23 import org.apache.activemq.command.MessageId; 24 import org.apache.activemq.command.ProducerId; 25 import org.apache.activemq.util.BitArrayBin; 26 import org.apache.activemq.util.IdGenerator; 27 import org.apache.activemq.util.LRUCache; 28 29 /** 30 * Provides basic audit functions for Messages 31 * 32 * @version $Revision: 1.1.1.1 $ 33 */ 34 public class ActiveMQMessageAudit { 35 36 public static final int DEFAULT_WINDOW_SIZE = 2048; 37 public static final int MAXIMUM_PRODUCER_COUNT = 64; 38 private int auditDepth; 39 private int maximumNumberOfProducersToTrack; 40 private LRUCache<Object, BitArrayBin> map; 41 42 /** 43 * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = 44 * 64 45 */ 46 public ActiveMQMessageAudit() { 47 this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT); 48 } 49 50 /** 51 * Construct a MessageAudit 52 * 53 * @param auditDepth range of ids to track 54 * @param maximumNumberOfProducersToTrack number of producers expected in 55 * the system 56 */ 57 public ActiveMQMessageAudit(int auditDepth, final int maximumNumberOfProducersToTrack) { 58 this.auditDepth = auditDepth; 59 this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack; 60 this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true); 61 } 62 63 /** 64 * @return the auditDepth 65 */ 66 public int getAuditDepth() { 67 return auditDepth; 68 } 69 70 /** 71 * @param auditDepth the auditDepth to set 72 */ 73 public void setAuditDepth(int auditDepth) { 74 this.auditDepth = auditDepth; 75 } 76 77 /** 78 * @return the maximumNumberOfProducersToTrack 79 */ 80 public int getMaximumNumberOfProducersToTrack() { 81 return maximumNumberOfProducersToTrack; 82 } 83 84 /** 85 * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set 86 */ 87 public void setMaximumNumberOfProducersToTrack( 88 int maximumNumberOfProducersToTrack) { 89 this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack; 90 this.map.setMaxCacheSize(maximumNumberOfProducersToTrack); 91 } 92 93 /** 94 * Checks if this message has been seen before 95 * 96 * @param message 97 * @return true if the message is a duplicate 98 * @throws JMSException 99 */ 100 public boolean isDuplicate(Message message) throws JMSException { 101 return isDuplicate(message.getJMSMessageID()); 102 } 103 104 /** 105 * checks whether this messageId has been seen before and adds this 106 * messageId to the list 107 * 108 * @param id 109 * @return true if the message is a duplicate 110 */ 111 public synchronized boolean isDuplicate(String id) { 112 boolean answer = false; 113 String seed = IdGenerator.getSeedFromId(id); 114 if (seed != null) { 115 BitArrayBin bab = map.get(seed); 116 if (bab == null) { 117 bab = new BitArrayBin(auditDepth); 118 map.put(seed, bab); 119 } 120 long index = IdGenerator.getSequenceFromId(id); 121 if (index >= 0) { 122 answer = bab.setBit(index, true); 123 } 124 } 125 return answer; 126 } 127 128 /** 129 * Checks if this message has been seen before 130 * 131 * @param message 132 * @return true if the message is a duplicate 133 */ 134 public boolean isDuplicate(final MessageReference message) { 135 MessageId id = message.getMessageId(); 136 return isDuplicate(id); 137 } 138 139 /** 140 * Checks if this messageId has been seen before 141 * 142 * @param id 143 * @return true if the message is a duplicate 144 */ 145 public synchronized boolean isDuplicate(final MessageId id) { 146 boolean answer = false; 147 148 if (id != null) { 149 ProducerId pid = id.getProducerId(); 150 if (pid != null) { 151 BitArrayBin bab = map.get(pid); 152 if (bab == null) { 153 bab = new BitArrayBin(auditDepth); 154 map.put(pid, bab); 155 } 156 answer = bab.setBit(id.getProducerSequenceId(), true); 157 } 158 } 159 return answer; 160 } 161 162 /** 163 * mark this message as being received 164 * 165 * @param message 166 */ 167 public void rollback(final MessageReference message) { 168 MessageId id = message.getMessageId(); 169 rollback(id); 170 } 171 172 /** 173 * mark this message as being received 174 * 175 * @param id 176 */ 177 public synchronized void rollback(final MessageId id) { 178 if (id != null) { 179 ProducerId pid = id.getProducerId(); 180 if (pid != null) { 181 BitArrayBin bab = map.get(pid); 182 if (bab != null) { 183 bab.setBit(id.getProducerSequenceId(), false); 184 } 185 } 186 } 187 } 188 189 /** 190 * Check the message is in order 191 * @param msg 192 * @return 193 * @throws JMSException 194 */ 195 public boolean isInOrder(Message msg) throws JMSException { 196 return isInOrder(msg.getJMSMessageID()); 197 } 198 199 /** 200 * Check the message id is in order 201 * @param id 202 * @return 203 */ 204 public synchronized boolean isInOrder(final String id) { 205 boolean answer = true; 206 207 if (id != null) { 208 String seed = IdGenerator.getSeedFromId(id); 209 if (seed != null) { 210 BitArrayBin bab = map.get(seed); 211 if (bab != null) { 212 long index = IdGenerator.getSequenceFromId(id); 213 answer = bab.isInOrder(index); 214 } 215 216 } 217 } 218 return answer; 219 } 220 221 /** 222 * Check the MessageId is in order 223 * @param message 224 * @return 225 */ 226 public synchronized boolean isInOrder(final MessageReference message) { 227 return isInOrder(message.getMessageId()); 228 } 229 230 /** 231 * Check the MessageId is in order 232 * @param id 233 * @return 234 */ 235 public synchronized boolean isInOrder(final MessageId id) { 236 boolean answer = false; 237 238 if (id != null) { 239 ProducerId pid = id.getProducerId(); 240 if (pid != null) { 241 BitArrayBin bab = map.get(pid); 242 if (bab == null) { 243 bab = new BitArrayBin(auditDepth); 244 map.put(pid, bab); 245 } 246 answer = bab.isInOrder(id.getProducerSequenceId()); 247 248 } 249 } 250 return answer; 251 } 252 }