Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq;
   18   
   19   import java.util.LinkedHashMap;
   20   import org.apache.activemq.command.ActiveMQDestination;
   21   import org.apache.activemq.command.Message;
   22   import org.apache.activemq.util.LRUCache;
   23   
   24   /**
   25    * An auditor class for a Connection that looks for duplicates
   26    */
   27   class ConnectionAudit {
   28   
   29       private boolean checkForDuplicates;
   30       private LinkedHashMap<ActiveMQDestination, ActiveMQMessageAudit> destinations = new LRUCache<ActiveMQDestination, ActiveMQMessageAudit>(1000);
   31       private LinkedHashMap<ActiveMQDispatcher, ActiveMQMessageAudit> dispatchers = new LRUCache<ActiveMQDispatcher, ActiveMQMessageAudit>(1000);
   32   
   33       
   34   	private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
   35   	private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
   36   	
   37   	
   38       synchronized void removeDispatcher(ActiveMQDispatcher dispatcher) {
   39           dispatchers.remove(dispatcher);
   40       }
   41   
   42       synchronized boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
   43           if (checkForDuplicates && message != null) {
   44               ActiveMQDestination destination = message.getDestination();
   45               if (destination != null) {
   46                   if (destination.isQueue()) {
   47                       ActiveMQMessageAudit audit = destinations.get(destination);
   48                       if (audit == null) {
   49                           audit = new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber);
   50                           destinations.put(destination, audit);
   51                       }
   52                       boolean result = audit.isDuplicate(message);
   53                       return result;
   54                   }
   55                   ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
   56                   if (audit == null) {
   57                       audit = new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber);
   58                       dispatchers.put(dispatcher, audit);
   59                   }
   60                   boolean result = audit.isDuplicate(message);
   61                   return result;
   62               }
   63           }
   64           return false;
   65       }
   66   
   67       protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
   68           if (checkForDuplicates && message != null) {
   69               ActiveMQDestination destination = message.getDestination();
   70               if (destination != null) {
   71                   if (destination.isQueue()) {
   72                       ActiveMQMessageAudit audit = destinations.get(destination);
   73                       if (audit != null) {
   74                           audit.rollback(message);
   75                       }
   76                   } else {
   77                       ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
   78                       if (audit != null) {
   79                           audit.rollback(message);
   80                       }
   81                   }
   82               }
   83           }
   84       }
   85   
   86       /**
   87        * @return the checkForDuplicates
   88        */
   89       boolean isCheckForDuplicates() {
   90           return this.checkForDuplicates;
   91       }
   92   
   93       /**
   94        * @param checkForDuplicates the checkForDuplicates to set
   95        */
   96       void setCheckForDuplicates(boolean checkForDuplicates) {
   97           this.checkForDuplicates = checkForDuplicates;
   98       }
   99   
  100   	public int getAuditDepth() {
  101   		return auditDepth;
  102   	}
  103   
  104   	public void setAuditDepth(int auditDepth) {
  105   		this.auditDepth = auditDepth;
  106   	}
  107   
  108   	public int getAuditMaximumProducerNumber() {
  109   		return auditMaximumProducerNumber;
  110   	}
  111   
  112   	public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
  113   		this.auditMaximumProducerNumber = auditMaximumProducerNumber;
  114   	}
  115       
  116   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]