Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » util » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.util;
   18   
   19   import java.io.IOException;
   20   import java.util.Set;
   21   import javax.annotation.PostConstruct;
   22   import org.apache.activemq.broker.BrokerPluginSupport;
   23   import org.apache.activemq.broker.Connection;
   24   import org.apache.activemq.broker.ConnectionContext;
   25   import org.apache.activemq.broker.ConsumerBrokerExchange;
   26   import org.apache.activemq.broker.ProducerBrokerExchange;
   27   import org.apache.activemq.broker.region.Destination;
   28   import org.apache.activemq.broker.region.MessageReference;
   29   import org.apache.activemq.broker.region.Subscription;
   30   import org.apache.activemq.command.ActiveMQDestination;
   31   import org.apache.activemq.command.BrokerInfo;
   32   import org.apache.activemq.command.ConnectionInfo;
   33   import org.apache.activemq.command.ConsumerInfo;
   34   import org.apache.activemq.command.DestinationInfo;
   35   import org.apache.activemq.command.Message;
   36   import org.apache.activemq.command.MessageAck;
   37   import org.apache.activemq.command.MessageDispatch;
   38   import org.apache.activemq.command.MessageDispatchNotification;
   39   import org.apache.activemq.command.MessagePull;
   40   import org.apache.activemq.command.ProducerInfo;
   41   import org.apache.activemq.command.RemoveSubscriptionInfo;
   42   import org.apache.activemq.command.Response;
   43   import org.apache.activemq.command.SessionInfo;
   44   import org.apache.activemq.command.TransactionId;
   45   import org.apache.activemq.usage.Usage;
   46   import org.apache.commons.logging.Log;
   47   import org.apache.commons.logging.LogFactory;
   48   
   49   /**
   50    * A simple Broker intercepter which allows you to enable/disable logging.
   51    * 
   52    * @org.apache.xbean.XBean
   53    */
   54   
   55   public class LoggingBrokerPlugin extends BrokerPluginSupport {
   56   
   57       private static final Log LOG = LogFactory.getLog(LoggingBrokerPlugin.class);
   58   
   59       private boolean logAll = false;
   60       private boolean logMessageEvents = false;
   61       private boolean logConnectionEvents = true;
   62       private boolean logTransactionEvents = false;
   63       private boolean logConsumerEvents = false;
   64       private boolean logProducerEvents = false;
   65       private boolean logInternalEvents = false;
   66   
   67       /**
   68        *
   69        * @throws Exception
   70        * @org.apache.xbean.InitMethod
   71        */
   72       @PostConstruct
   73       public void afterPropertiesSet() throws Exception {
   74           LOG.info("Created LoggingBrokerPlugin: " + this.toString());
   75       }
   76   
   77       public boolean isLogAll() {
   78           return logAll;
   79       }
   80       
   81       /**
   82        * Log all Events that go through the Plugin
   83        */
   84       public void setLogAll(boolean logAll) {
   85           this.logAll = logAll;
   86       }
   87   
   88       public boolean isLogMessageEvents() {
   89           return logMessageEvents;
   90       }
   91   
   92       /**
   93        * Log Events that are related to message processing
   94        */
   95       public void setLogMessageEvents(boolean logMessageEvents) {
   96           this.logMessageEvents = logMessageEvents;
   97       }
   98   
   99       public boolean isLogConnectionEvents() {
  100           return logConnectionEvents;
  101       }
  102   
  103       /**
  104        * Log Events that are related to connections and sessions
  105        */
  106       public void setLogConnectionEvents(boolean logConnectionEvents) {
  107           this.logConnectionEvents = logConnectionEvents;
  108       }
  109   
  110       public boolean isLogTransactionEvents() {
  111           return logTransactionEvents;
  112       }
  113   
  114       /**
  115        * Log Events that are related to transaction processing
  116        */
  117       public void setLogTransactionEvents(boolean logTransactionEvents) {
  118           this.logTransactionEvents = logTransactionEvents;
  119       }
  120   
  121       public boolean isLogConsumerEvents() {
  122           return logConsumerEvents;
  123       }
  124   
  125       /**
  126        * Log Events that are related to Consumers
  127        */
  128       public void setLogConsumerEvents(boolean logConsumerEvents) {
  129           this.logConsumerEvents = logConsumerEvents;
  130       }
  131   
  132       public boolean isLogProducerEvents() {
  133           return logProducerEvents;
  134       }
  135   
  136       /**
  137        * Log Events that are related to Producers
  138        */
  139       public void setLogProducerEvents(boolean logProducerEvents) {
  140           this.logProducerEvents = logProducerEvents;
  141       }
  142   
  143       public boolean isLogInternalEvents() {
  144           return logInternalEvents;
  145       }
  146   
  147       /**
  148        * Log Events that are normally internal to the broker
  149        */
  150       public void setLogInternalEvents(boolean logInternalEvents) {
  151           this.logInternalEvents = logInternalEvents;
  152       }
  153   
  154       public void acknowledge(ConsumerBrokerExchange consumerExchange,
  155               MessageAck ack) throws Exception {
  156           if (isLogAll() || isLogConsumerEvents()) {
  157               LOG.info("Acknowledging message for client ID : "
  158                       + consumerExchange.getConnectionContext().getClientId() 
  159                       + (ack.getMessageCount() == 1 ? ", " + ack.getLastMessageId() : ""));
  160               if (LOG.isTraceEnabled() && ack.getMessageCount() > 1) {
  161                   LOG.trace("Message count: " + ack.getMessageCount()
  162                           + ", First Message Id: " + ack.getFirstMessageId()
  163                           + ", Last Message Id: " + ack.getLastMessageId());
  164               }
  165           }
  166           super.acknowledge(consumerExchange, ack);
  167       }
  168   
  169       public Response messagePull(ConnectionContext context, MessagePull pull)
  170               throws Exception {
  171           if (isLogAll() || isLogConsumerEvents()) {
  172               LOG.info("Message Pull from : " + context.getClientId() + " on "
  173                       + pull.getDestination().getPhysicalName());
  174           }
  175           return super.messagePull(context, pull);
  176       }
  177   
  178       public void addConnection(ConnectionContext context, ConnectionInfo info)
  179               throws Exception {
  180           if (isLogAll() || isLogConnectionEvents()) {
  181               LOG.info("Adding Connection : " + context);
  182           }
  183           super.addConnection(context, info);
  184       }
  185   
  186       public Subscription addConsumer(ConnectionContext context, ConsumerInfo info)
  187               throws Exception {
  188           if (isLogAll() || isLogConsumerEvents()) {
  189               LOG.info("Adding Consumer : " + info);
  190           }
  191           return super.addConsumer(context, info);
  192       }
  193   
  194       public void addProducer(ConnectionContext context, ProducerInfo info)
  195               throws Exception {
  196           if (isLogAll() || isLogProducerEvents()) {
  197               LOG.info("Adding Producer :" + info);
  198           }
  199           super.addProducer(context, info);
  200       }
  201   
  202       public void commitTransaction(ConnectionContext context, TransactionId xid,
  203               boolean onePhase) throws Exception {
  204           if (isLogAll() || isLogTransactionEvents()) {
  205               LOG.info("Commiting transaction : " + xid.getTransactionKey());
  206           }
  207           super.commitTransaction(context, xid, onePhase);
  208       }
  209   
  210       public void removeSubscription(ConnectionContext context,
  211               RemoveSubscriptionInfo info) throws Exception {
  212           if (isLogAll() || isLogConsumerEvents()) {
  213               LOG.info("Removing subscription : " + info);
  214           }
  215           super.removeSubscription(context, info);
  216       }
  217   
  218       public TransactionId[] getPreparedTransactions(ConnectionContext context)
  219               throws Exception {
  220   
  221           TransactionId[] result = super.getPreparedTransactions(context);
  222           if ((isLogAll() || isLogTransactionEvents()) && result != null) {
  223               StringBuffer tids = new StringBuffer();
  224               for (TransactionId tid : result) {
  225                   if (tids.length() > 0) {
  226                       tids.append(", ");
  227                   }
  228                   tids.append(tid.getTransactionKey());
  229               }
  230               LOG.info("Prepared transactions : " + tids);
  231           }
  232           return result;
  233       }
  234   
  235       public int prepareTransaction(ConnectionContext context, TransactionId xid)
  236               throws Exception {
  237           if (isLogAll() || isLogTransactionEvents()) {
  238               LOG.info("Preparing transaction : " + xid.getTransactionKey());
  239           }
  240           return super.prepareTransaction(context, xid);
  241       }
  242   
  243       public void removeConnection(ConnectionContext context,
  244               ConnectionInfo info, Throwable error) throws Exception {
  245           if (isLogAll() || isLogConnectionEvents()) {
  246               LOG.info("Removing Connection : " + info);
  247           }
  248           super.removeConnection(context, info, error);
  249       }
  250   
  251       public void removeConsumer(ConnectionContext context, ConsumerInfo info)
  252               throws Exception {
  253           if (isLogAll() || isLogConsumerEvents()) {
  254               LOG.info("Removing Consumer : " + info);
  255           }
  256           super.removeConsumer(context, info);
  257       }
  258   
  259       public void removeProducer(ConnectionContext context, ProducerInfo info)
  260               throws Exception {
  261           if (isLogAll() || isLogProducerEvents()) {
  262               LOG.info("Removing Producer : " + info);
  263           }
  264           super.removeProducer(context, info);
  265       }
  266   
  267       public void rollbackTransaction(ConnectionContext context, TransactionId xid)
  268               throws Exception {
  269           if (isLogAll() || isLogTransactionEvents()) {
  270               LOG.info("Rolling back Transaction : " + xid.getTransactionKey());
  271           }
  272           super.rollbackTransaction(context, xid);
  273       }
  274   
  275       public void send(ProducerBrokerExchange producerExchange,
  276               Message messageSend) throws Exception {
  277           if (isLogAll() || isLogProducerEvents()) {
  278               LOG.info("Sending message : " + messageSend);
  279           }
  280           super.send(producerExchange, messageSend);
  281       }
  282   
  283       public void beginTransaction(ConnectionContext context, TransactionId xid)
  284               throws Exception {
  285           if (isLogAll() || isLogTransactionEvents()) {
  286               LOG.info("Beginning transaction : " + xid.getTransactionKey());
  287           }
  288           super.beginTransaction(context, xid);
  289       }
  290   
  291       public void forgetTransaction(ConnectionContext context,
  292               TransactionId transactionId) throws Exception {
  293           if (isLogAll() || isLogTransactionEvents()) {
  294               LOG.info("Forgetting transaction : "
  295                       + transactionId.getTransactionKey());
  296           }
  297           super.forgetTransaction(context, transactionId);
  298       }
  299   
  300       public Connection[] getClients() throws Exception {
  301           Connection[] result = super.getClients();
  302   
  303           if (isLogAll() || isLogInternalEvents()) {
  304               if (result == null) {
  305                   LOG.info("Get Clients returned empty list.");
  306               } else {
  307                   StringBuffer cids = new StringBuffer();
  308                   for (Connection c : result) {
  309                       cids.append(cids.length() > 0 ? ", " : "");
  310                       cids.append(c.getConnectionId());
  311                   }
  312                   LOG.info("Connected clients : " + cids);
  313               }
  314           }
  315           return super.getClients();
  316       }
  317   
  318       public org.apache.activemq.broker.region.Destination addDestination(
  319               ConnectionContext context, ActiveMQDestination destination)
  320               throws Exception {
  321           if (isLogAll() || isLogInternalEvents()) {
  322               LOG.info("Adding destination : "
  323                       + destination.getDestinationTypeAsString() + ":"
  324                       + destination.getPhysicalName());
  325           }
  326           return super.addDestination(context, destination);
  327       }
  328   
  329       public void removeDestination(ConnectionContext context,
  330               ActiveMQDestination destination, long timeout) throws Exception {
  331           if (isLogAll() || isLogInternalEvents()) {
  332               LOG.info("Removing destination : "
  333                       + destination.getDestinationTypeAsString() + ":"
  334                       + destination.getPhysicalName());
  335           }
  336           super.removeDestination(context, destination, timeout);
  337       }
  338   
  339       public ActiveMQDestination[] getDestinations() throws Exception {
  340           ActiveMQDestination[] result = super.getDestinations();
  341           if (isLogAll() || isLogInternalEvents()) {
  342               if (result == null) {
  343                   LOG.info("Get Destinations returned empty list.");
  344               } else {
  345                   StringBuffer destinations = new StringBuffer();
  346                   for (ActiveMQDestination dest : result) {
  347                       destinations.append(destinations.length() > 0 ? ", " : "");
  348                       destinations.append(dest.getPhysicalName());
  349                   }
  350                   LOG.info("Get Destinations : " + destinations);
  351               }
  352           }
  353           return result;
  354       }
  355   
  356       public void start() throws Exception {
  357           if (isLogAll() || isLogInternalEvents()) {
  358               LOG.info("Starting " + getBrokerName());
  359           }
  360           super.start();
  361       }
  362   
  363       public void stop() throws Exception {
  364           if (isLogAll() || isLogInternalEvents()) {
  365               LOG.info("Stopping " + getBrokerName());
  366           }
  367           super.stop();
  368       }
  369   
  370       public void addSession(ConnectionContext context, SessionInfo info)
  371               throws Exception {
  372           if (isLogAll() || isLogConnectionEvents()) {
  373               LOG.info("Adding Session : " + info);
  374           }
  375           super.addSession(context, info);
  376       }
  377   
  378       public void removeSession(ConnectionContext context, SessionInfo info)
  379               throws Exception {
  380           if (isLogAll() || isLogConnectionEvents()) {
  381               LOG.info("Removing Session : " + info);
  382           }
  383           super.removeSession(context, info);
  384       }
  385   
  386       public void addBroker(Connection connection, BrokerInfo info) {
  387           if (isLogAll() || isLogInternalEvents()) {
  388               LOG.info("Adding Broker " + info.getBrokerName());
  389           }
  390           super.addBroker(connection, info);
  391       }
  392   
  393       public void removeBroker(Connection connection, BrokerInfo info) {
  394           if (isLogAll() || isLogInternalEvents()) {
  395               LOG.info("Removing Broker " + info.getBrokerName());
  396           }
  397           super.removeBroker(connection, info);
  398       }
  399   
  400       public BrokerInfo[] getPeerBrokerInfos() {
  401           BrokerInfo[] result = super.getPeerBrokerInfos();
  402           if (isLogAll() || isLogInternalEvents()) {
  403               if (result == null) {
  404                   LOG.info("Get Peer Broker Infos returned empty list.");
  405               } else {
  406                   StringBuffer peers = new StringBuffer();
  407                   for (BrokerInfo bi : result) {
  408                       peers.append(peers.length() > 0 ? ", " : "");
  409                       peers.append(bi.getBrokerName());
  410                   }
  411                   LOG.info("Get Peer Broker Infos : " + peers);
  412               }
  413           }
  414           return result;
  415       }
  416   
  417       public void preProcessDispatch(MessageDispatch messageDispatch) {
  418           if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
  419               LOG.info("preProcessDispatch :" + messageDispatch);
  420           }
  421           super.preProcessDispatch(messageDispatch);
  422       }
  423   
  424       public void postProcessDispatch(MessageDispatch messageDispatch) {
  425           if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
  426               LOG.info("postProcessDispatch :" + messageDispatch);
  427           }
  428           super.postProcessDispatch(messageDispatch);
  429       }
  430   
  431       public void processDispatchNotification(
  432               MessageDispatchNotification messageDispatchNotification)
  433               throws Exception {
  434           if (isLogAll() || isLogInternalEvents() || isLogConsumerEvents()) {
  435               LOG.info("ProcessDispatchNotification :"
  436                       + messageDispatchNotification);
  437           }
  438           super.processDispatchNotification(messageDispatchNotification);
  439       }
  440   
  441       public Set<ActiveMQDestination> getDurableDestinations() {
  442           Set<ActiveMQDestination> result = super.getDurableDestinations();
  443           if (isLogAll() || isLogInternalEvents()) {
  444               if (result == null) {
  445                   LOG.info("Get Durable Destinations returned empty list.");
  446               } else {
  447                   StringBuffer destinations = new StringBuffer();
  448                   for (ActiveMQDestination dest : result) {
  449                       destinations.append(destinations.length() > 0 ? ", " : "");
  450                       destinations.append(dest.getPhysicalName());
  451                   }
  452                   LOG.info("Get Durable Destinations : " + destinations);
  453               }
  454           }
  455           return result;
  456       }
  457   
  458       public void addDestinationInfo(ConnectionContext context,
  459               DestinationInfo info) throws Exception {
  460           if (isLogAll() || isLogInternalEvents()) {
  461               LOG.info("Adding destination info : " + info);
  462           }
  463           super.addDestinationInfo(context, info);
  464       }
  465   
  466       public void removeDestinationInfo(ConnectionContext context,
  467               DestinationInfo info) throws Exception {
  468           if (isLogAll() || isLogInternalEvents()) {
  469               LOG.info("Removing destination info : " + info);
  470           }
  471           super.removeDestinationInfo(context, info);
  472       }
  473   
  474       public void messageExpired(ConnectionContext context,
  475               MessageReference message) {
  476           if (isLogAll() || isLogInternalEvents()) {
  477               String msg = "Unable to display message.";
  478               try {
  479                   msg = message.getMessage().toString();
  480               } catch (IOException ioe) {
  481               }
  482               LOG.info("Message has expired : " + msg);
  483           }
  484           super.messageExpired(context, message);
  485       }
  486   
  487       public void sendToDeadLetterQueue(ConnectionContext context,
  488               MessageReference messageReference) {
  489           if (isLogAll() || isLogInternalEvents()) {
  490               String msg = "Unable to display message.";
  491               try {
  492                   msg = messageReference.getMessage().toString();
  493               } catch (IOException ioe) {
  494               }
  495               LOG.info("Sending to DLQ : " + msg);
  496           }
  497       }
  498   
  499       public void fastProducer(ConnectionContext context,
  500               ProducerInfo producerInfo) {
  501           if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
  502               LOG.info("Fast Producer : " + producerInfo);
  503           }
  504           super.fastProducer(context, producerInfo);
  505       }
  506   
  507       public void isFull(ConnectionContext context, Destination destination,
  508               Usage usage) {
  509           if (isLogAll() || isLogProducerEvents() || isLogInternalEvents()) {
  510               LOG.info("Destination is full : " + destination.getName());
  511           }
  512           super.isFull(context, destination, usage);
  513       }
  514   
  515       public void messageConsumed(ConnectionContext context,
  516               MessageReference messageReference) {
  517           if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
  518               String msg = "Unable to display message.";
  519               try {
  520                   msg = messageReference.getMessage().toString();
  521               } catch (IOException ioe) {
  522               }
  523               LOG.info("Message consumed : " + msg);
  524           }
  525           super.messageConsumed(context, messageReference);
  526       }
  527   
  528       public void messageDelivered(ConnectionContext context,
  529               MessageReference messageReference) {
  530           if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
  531               String msg = "Unable to display message.";
  532               try {
  533                   msg = messageReference.getMessage().toString();
  534               } catch (IOException ioe) {
  535               }
  536               LOG.info("Message delivered : " + msg);
  537           }
  538           super.messageDelivered(context, messageReference);
  539       }
  540   
  541       public void messageDiscarded(ConnectionContext context,
  542               MessageReference messageReference) {
  543           if (isLogAll() || isLogInternalEvents()) {
  544               String msg = "Unable to display message.";
  545               try {
  546                   msg = messageReference.getMessage().toString();
  547               } catch (IOException ioe) {
  548               }
  549               LOG.info("Message discarded : " + msg);
  550           }
  551           super.messageDiscarded(context, messageReference);
  552       }
  553   
  554       public void slowConsumer(ConnectionContext context,
  555               Destination destination, Subscription subs) {
  556           if (isLogAll() || isLogConsumerEvents() || isLogInternalEvents()) {
  557               LOG.info("Detected slow consumer on " + destination.getName());
  558               StringBuffer buf = new StringBuffer("Connection(");
  559               buf.append(subs.getConsumerInfo().getConsumerId().getConnectionId());
  560               buf.append(") Session(");
  561               buf.append(subs.getConsumerInfo().getConsumerId().getSessionId());
  562               buf.append(")");
  563               LOG.info(buf);
  564           }
  565           super.slowConsumer(context, destination, subs);
  566       }
  567   
  568       public void nowMasterBroker() {
  569           if (isLogAll() || isLogInternalEvents()) {
  570               LOG.info("Is now the master broker : " + getBrokerName());
  571           }
  572           super.nowMasterBroker();
  573       }
  574   
  575       public String toString() {
  576           StringBuffer buf = new StringBuffer();
  577           buf.append("LoggingBrokerPlugin(");
  578           buf.append("logAll=");
  579           buf.append(isLogAll());
  580           buf.append(", logConnectionEvents=");
  581           buf.append(isLogConnectionEvents());
  582           buf.append(", logConsumerEvents=");
  583           buf.append(isLogConsumerEvents());
  584           buf.append(", logProducerEvents=");
  585           buf.append(isLogProducerEvents());
  586           buf.append(", logMessageEvents=");
  587           buf.append(isLogMessageEvents());
  588           buf.append(", logTransactionEvents=");
  589           buf.append(isLogTransactionEvents());
  590           buf.append(", logInternalEvents=");
  591           buf.append(isLogInternalEvents());
  592           buf.append(")");
  593           return buf.toString();
  594       }
  595   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » util » [javadoc | source]