Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.broker.region;
   18   
   19   import org.apache.activemq.broker.BrokerService;
   20   import org.apache.activemq.broker.ConnectionContext;
   21   import org.apache.activemq.broker.ProducerBrokerExchange;
   22   import org.apache.activemq.broker.region.policy.DispatchPolicy;
   23   import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
   24   import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
   25   import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
   26   import org.apache.activemq.command.ActiveMQDestination;
   27   import org.apache.activemq.command.ExceptionResponse;
   28   import org.apache.activemq.command.Message;
   29   import org.apache.activemq.command.MessageAck;
   30   import org.apache.activemq.command.MessageId;
   31   import org.apache.activemq.command.ProducerAck;
   32   import org.apache.activemq.command.ProducerInfo;
   33   import org.apache.activemq.command.Response;
   34   import org.apache.activemq.command.SubscriptionInfo;
   35   import org.apache.activemq.filter.MessageEvaluationContext;
   36   import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
   37   import org.apache.activemq.store.MessageRecoveryListener;
   38   import org.apache.activemq.store.TopicMessageStore;
   39   import org.apache.activemq.thread.Task;
   40   import org.apache.activemq.thread.TaskRunner;
   41   import org.apache.activemq.thread.TaskRunnerFactory;
   42   import org.apache.activemq.thread.Valve;
   43   import org.apache.activemq.transaction.Synchronization;
   44   import org.apache.activemq.usage.Usage;
   45   import org.apache.activemq.util.SubscriptionKey;
   46   import org.apache.commons.logging.Log;
   47   import org.apache.commons.logging.LogFactory;
   48   import java.io.IOException;
   49   import java.util.ArrayList;
   50   import java.util.LinkedList;
   51   import java.util.List;
   52   import java.util.Set;
   53   import java.util.concurrent.ConcurrentHashMap;
   54   import java.util.concurrent.CopyOnWriteArrayList;
   55   import java.util.concurrent.CopyOnWriteArraySet;
   56   
   57   /**
   58    * The Topic is a destination that sends a copy of a message to every active
   59    * Subscription registered.
   60    * 
   61    * @version $Revision: 1.21 $
   62    */
   63   public class Topic extends BaseDestination implements Task {
   64       protected static final Log LOG = LogFactory.getLog(Topic.class);
   65       private final TopicMessageStore topicStore;
   66       protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
   67       protected final Valve dispatchValve = new Valve(true);
   68       private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
   69       private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
   70       private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
   71       private final TaskRunner taskRunner;
   72       private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
   73       private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
   74           public void run() {
   75               try {
   76                   Topic.this.taskRunner.wakeup();
   77               } catch (InterruptedException e) {
   78               }
   79           };
   80       };
   81   
   82       public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception {
   83           super(brokerService, store, destination, parentStats);
   84           this.topicStore = store;
   85           //set default subscription recovery policy
   86           subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
   87           this.taskRunner = taskFactory.createTaskRunner(this, "Topic  " + destination.getPhysicalName());
   88       }
   89   
   90       public void initialize() throws Exception {
   91           super.initialize();
   92           if (store != null) {
   93               // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics.
   94               // int messageCount = store.getMessageCount();
   95               // destinationStatistics.getMessages().setCount(messageCount);
   96           }
   97       }
   98   
   99       public List<Subscription> getConsumers() {
  100           synchronized (consumers) {
  101               return new ArrayList<Subscription>(consumers);
  102           }
  103       }
  104   
  105       public boolean lock(MessageReference node, LockOwner sub) {
  106           return true;
  107       }
  108   
  109       public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
  110   
  111           destinationStatistics.getConsumers().increment();
  112   
  113           if (!sub.getConsumerInfo().isDurable()) {
  114   
  115               // Do a retroactive recovery if needed.
  116               if (sub.getConsumerInfo().isRetroactive()) {
  117   
  118                   // synchronize with dispatch method so that no new messages are
  119                   // sent
  120                   // while we are recovering a subscription to avoid out of order
  121                   // messages.
  122                   dispatchValve.turnOff();
  123                   try {
  124   
  125                       synchronized (consumers) {
  126                           sub.add(context, this);
  127                           consumers.add(sub);
  128                       }
  129                       subscriptionRecoveryPolicy.recover(context, this, sub);
  130   
  131                   } finally {
  132                       dispatchValve.turnOn();
  133                   }
  134   
  135               } else {
  136                   synchronized (consumers) {
  137                       sub.add(context, this);
  138                       consumers.add(sub);
  139                   }
  140               }
  141           } else {
  142               sub.add(context, this);
  143               DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
  144               durableSubcribers.put(dsub.getSubscriptionKey(), dsub);
  145           }
  146       }
  147   
  148       public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
  149           if (!sub.getConsumerInfo().isDurable()) {
  150               destinationStatistics.getConsumers().decrement();
  151               synchronized (consumers) {
  152                   consumers.remove(sub);
  153               }
  154           }
  155           sub.remove(context, this);
  156       }
  157   
  158       public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
  159           if (topicStore != null) {
  160               topicStore.deleteSubscription(key.clientId, key.subscriptionName);
  161               Object removed = durableSubcribers.remove(key);
  162               if (removed != null) {
  163                   destinationStatistics.getConsumers().decrement();
  164               }
  165           }
  166       }
  167   
  168       public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
  169           // synchronize with dispatch method so that no new messages are sent
  170           // while
  171           // we are recovering a subscription to avoid out of order messages.
  172           dispatchValve.turnOff();
  173           try {
  174   
  175               if (topicStore == null) {
  176                   return;
  177               }
  178   
  179               // Recover the durable subscription.
  180               String clientId = subscription.getSubscriptionKey().getClientId();
  181               String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
  182               String selector = subscription.getConsumerInfo().getSelector();
  183               SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
  184               if (info != null) {
  185                   // Check to see if selector changed.
  186                   String s1 = info.getSelector();
  187                   if (s1 == null ^ selector == null || (s1 != null && !s1.equals(selector))) {
  188                       // Need to delete the subscription
  189                       topicStore.deleteSubscription(clientId, subscriptionName);
  190                       info = null;
  191                   } else {
  192                       synchronized (consumers) {
  193                           consumers.add(subscription);
  194                       }
  195                   }
  196               }
  197               // Do we need to create the subscription?
  198               if (info == null) {
  199                   info = new SubscriptionInfo();
  200                   info.setClientId(clientId);
  201                   info.setSelector(selector);
  202                   info.setSubscriptionName(subscriptionName);
  203                   info.setDestination(getActiveMQDestination());
  204                   // This destination is an actual destination id.
  205                   info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
  206                   // This destination might be a pattern
  207                   synchronized (consumers) {
  208                       consumers.add(subscription);
  209                       topicStore.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
  210                   }
  211               }
  212   
  213               final MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
  214               msgContext.setDestination(destination);
  215               if (subscription.isRecoveryRequired()) {
  216                   topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
  217                       public boolean recoverMessage(Message message) throws Exception {
  218                           message.setRegionDestination(Topic.this);
  219                           try {
  220                               msgContext.setMessageReference(message);
  221                               if (subscription.matches(message, msgContext)) {
  222                                   subscription.add(message);
  223                               }
  224                           } catch (IOException e) {
  225                               LOG.error("Failed to recover this message " + message);
  226                           }
  227                           return true;
  228                       }
  229   
  230                       public boolean recoverMessageReference(MessageId messageReference) throws Exception {
  231                           throw new RuntimeException("Should not be called.");
  232                       }
  233   
  234                       public boolean hasSpace() {
  235                           return true;
  236                       }
  237   
  238                       public boolean isDuplicate(MessageId id) {
  239                           return false;
  240                       }
  241                   });
  242               }
  243           } finally {
  244               dispatchValve.turnOn();
  245           }
  246       }
  247   
  248       public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
  249           synchronized (consumers) {
  250               consumers.remove(sub);
  251           }
  252           sub.remove(context, this);
  253       }
  254   
  255       protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
  256           if (subscription.getConsumerInfo().isRetroactive()) {
  257               subscriptionRecoveryPolicy.recover(context, this, subscription);
  258           }
  259       }
  260   
  261       public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
  262           final ConnectionContext context = producerExchange.getConnectionContext();
  263   
  264           final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
  265           final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
  266   
  267           // There is delay between the client sending it and it arriving at the
  268           // destination.. it may have expired.
  269           if (message.isExpired()) {
  270               broker.messageExpired(context, message);
  271               getDestinationStatistics().getExpired().increment();
  272               if (sendProducerAck) {
  273                   ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
  274                   context.getConnection().dispatchAsync(ack);
  275               }
  276               return;
  277           }
  278   
  279           if (memoryUsage.isFull()) {
  280               isFull(context, memoryUsage);
  281               fastProducer(context, producerInfo);
  282   
  283               if (isProducerFlowControl() && context.isProducerFlowControl()) {
  284   
  285                   if (warnOnProducerFlowControl) {
  286                       warnOnProducerFlowControl = false;
  287                       LOG.info("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached for " + getActiveMQDestination().getQualifiedName()
  288                               + ". Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it."
  289                               + " See http://activemq.apache.org/producer-flow-control.html for more info");
  290                   }
  291   
  292                   if (systemUsage.isSendFailIfNoSpace()) {
  293                       throw new javax.jms.ResourceAllocationException("Usage Manager memory limit ("+ memoryUsage.getLimit() + ") reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
  294                               + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
  295                   }
  296   
  297                   // We can avoid blocking due to low usage if the producer is sending
  298                   // a sync message or
  299                   // if it is using a producer window
  300                   if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
  301                       synchronized (messagesWaitingForSpace) {
  302                           messagesWaitingForSpace.add(new Runnable() {
  303                               public void run() {
  304   
  305                                   try {
  306   
  307                                       // While waiting for space to free up... the
  308                                       // message may have expired.
  309                                       if (message.isExpired()) {
  310                                           broker.messageExpired(context, message);
  311                                           getDestinationStatistics().getExpired().increment();
  312                                       } else {
  313                                           doMessageSend(producerExchange, message);
  314                                       }
  315   
  316                                       if (sendProducerAck) {
  317                                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
  318                                           context.getConnection().dispatchAsync(ack);
  319                                       } else {
  320                                           Response response = new Response();
  321                                           response.setCorrelationId(message.getCommandId());
  322                                           context.getConnection().dispatchAsync(response);
  323                                       }
  324   
  325                                   } catch (Exception e) {
  326                                       if (!sendProducerAck && !context.isInRecoveryMode()) {
  327                                           ExceptionResponse response = new ExceptionResponse(e);
  328                                           response.setCorrelationId(message.getCommandId());
  329                                           context.getConnection().dispatchAsync(response);
  330                                       }
  331                                   }
  332   
  333                               }
  334                           });
  335   
  336                           // If the user manager is not full, then the task will not
  337                           // get called..
  338                           if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
  339                               // so call it directly here.
  340                               sendMessagesWaitingForSpaceTask.run();
  341                           }
  342                           context.setDontSendReponse(true);
  343                           return;
  344                       }
  345   
  346                   } else {
  347                       // Producer flow control cannot be used, so we have do the flow
  348                       // control at the broker
  349                       // by blocking this thread until there is space available.
  350                       
  351                       if (memoryUsage.isFull()) {
  352                           if (context.isInTransaction()) {
  353   
  354                               int count = 0;
  355                               while (!memoryUsage.waitForSpace(1000)) {
  356                                   if (context.getStopping().get()) {
  357                                       throw new IOException("Connection closed, send aborted.");
  358                                   }
  359                                   if (count > 2 && context.isInTransaction()) {
  360                                       count = 0;
  361                                       int size = context.getTransaction().size();
  362                                       LOG.warn("Waiting for space to send  transacted message - transaction elements = " + size + " need more space to commit. Message = " + message);
  363                                   }
  364                               }
  365                           } else {
  366                               waitForSpace(context, memoryUsage, "Usage Manager memory limit reached. Stopping producer (" + message.getProducerId() + ") to prevent flooding "
  367                                       + getActiveMQDestination().getQualifiedName() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
  368                           }
  369                       }
  370   
  371                       // The usage manager could have delayed us by the time
  372                       // we unblock the message could have expired..
  373                       if (message.isExpired()) {
  374                           getDestinationStatistics().getExpired().increment();
  375                           if (LOG.isDebugEnabled()) {
  376                               LOG.debug("Expired message: " + message);
  377                           }
  378                           return;
  379                       }
  380                   }
  381               }
  382           }
  383   
  384           doMessageSend(producerExchange, message);
  385           messageDelivered(context, message);
  386           if (sendProducerAck) {
  387               ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
  388               context.getConnection().dispatchAsync(ack);
  389           }
  390       }
  391   
  392       /**
  393        * do send the message - this needs to be synchronized to ensure messages
  394        * are stored AND dispatched in the right order
  395        * 
  396        * @param producerExchange
  397        * @param message
  398        * @throws IOException
  399        * @throws Exception
  400        */
  401       synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
  402           final ConnectionContext context = producerExchange.getConnectionContext();
  403           message.setRegionDestination(this);
  404           message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
  405   
  406           if (topicStore != null && message.isPersistent() && !canOptimizeOutPersistence()) {
  407               if (systemUsage.getStoreUsage().isFull()) {
  408                   final String logMessage = "Usage Manager Store is Full. Stopping producer (" + message.getProducerId() + ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
  409                           + " See http://activemq.apache.org/producer-flow-control.html for more info";
  410                   if (systemUsage.isSendFailIfNoSpace()) {
  411                       throw new javax.jms.ResourceAllocationException(logMessage);
  412                   }
  413   
  414                   waitForSpace(context, systemUsage.getStoreUsage(), logMessage);
  415               }
  416               topicStore.addMessage(context, message);
  417           }
  418   
  419           message.incrementReferenceCount();
  420   
  421           if (context.isInTransaction()) {
  422               context.getTransaction().addSynchronization(new Synchronization() {
  423                   public void afterCommit() throws Exception {
  424                       // It could take while before we receive the commit
  425                       // operration.. by that time the message could have
  426                       // expired..
  427                       if (broker.isExpired(message)) {
  428                           getDestinationStatistics().getExpired().increment();
  429                           broker.messageExpired(context, message);
  430                           message.decrementReferenceCount();
  431                           return;
  432                       }
  433                       try {
  434                           dispatch(context, message);
  435                       } finally {
  436                           message.decrementReferenceCount();
  437                       }
  438                   }
  439               });
  440   
  441           } else {
  442               try {
  443                   dispatch(context, message);
  444               } finally {
  445                   message.decrementReferenceCount();
  446               }
  447           }
  448   
  449       }
  450   
  451       private boolean canOptimizeOutPersistence() {
  452           return durableSubcribers.size() == 0;
  453       }
  454   
  455       public String toString() {
  456           return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
  457       }
  458   
  459       public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException {
  460           if (topicStore != null && node.isPersistent()) {
  461               DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
  462               SubscriptionKey key = dsub.getSubscriptionKey();
  463               topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId());
  464           }
  465           messageConsumed(context, node);
  466       }
  467   
  468       public void gc() {
  469       }
  470   
  471       public Message loadMessage(MessageId messageId) throws IOException {
  472           return topicStore != null ? topicStore.getMessage(messageId) : null;
  473       }
  474   
  475       public void start() throws Exception {
  476           this.subscriptionRecoveryPolicy.start();
  477           if (memoryUsage != null) {
  478               memoryUsage.start();
  479           }
  480   
  481       }
  482   
  483       public void stop() throws Exception {
  484           if (taskRunner != null) {
  485               taskRunner.shutdown();
  486           }
  487           this.subscriptionRecoveryPolicy.stop();
  488           if (memoryUsage != null) {
  489               memoryUsage.stop();
  490           }
  491           if (this.topicStore != null) {
  492               this.topicStore.stop();
  493           }
  494       }
  495   
  496       public Message[] browse() {
  497           final Set<Message> result = new CopyOnWriteArraySet<Message>();
  498           try {
  499               if (topicStore != null) {
  500                   topicStore.recover(new MessageRecoveryListener() {
  501                       public boolean recoverMessage(Message message) throws Exception {
  502                           result.add(message);
  503                           return true;
  504                       }
  505   
  506                       public boolean recoverMessageReference(MessageId messageReference) throws Exception {
  507                           return true;
  508                       }
  509   
  510                       public boolean hasSpace() {
  511                           return true;
  512                       }
  513   
  514                       public boolean isDuplicate(MessageId id) {
  515                           return false;
  516                       }
  517                   });
  518                   Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
  519                   if (msgs != null) {
  520                       for (int i = 0; i < msgs.length; i++) {
  521                           result.add(msgs[i]);
  522                       }
  523                   }
  524               }
  525           } catch (Throwable e) {
  526               LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
  527           }
  528           return result.toArray(new Message[result.size()]);
  529       }
  530   
  531       public boolean iterate() {
  532           synchronized (messagesWaitingForSpace) {
  533               while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
  534                   Runnable op = messagesWaitingForSpace.removeFirst();
  535                   op.run();
  536               }
  537           }
  538           return false;
  539       }
  540   
  541       // Properties
  542       // -------------------------------------------------------------------------
  543   
  544       public DispatchPolicy getDispatchPolicy() {
  545           return dispatchPolicy;
  546       }
  547   
  548       public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
  549           this.dispatchPolicy = dispatchPolicy;
  550       }
  551   
  552       public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
  553           return subscriptionRecoveryPolicy;
  554       }
  555   
  556       public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
  557           this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
  558       }
  559   
  560       // Implementation methods
  561       // -------------------------------------------------------------------------
  562   
  563       public final void wakeup() {
  564       }
  565   
  566       protected void dispatch(final ConnectionContext context, Message message) throws Exception {
  567           // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics.
  568           // destinationStatistics.getMessages().increment();
  569           destinationStatistics.getEnqueues().increment();
  570           dispatchValve.increment();
  571           MessageEvaluationContext msgContext = null;
  572           try {
  573               if (!subscriptionRecoveryPolicy.add(context, message)) {
  574                   return;
  575               }
  576               synchronized (consumers) {
  577                   if (consumers.isEmpty()) {
  578                       onMessageWithNoConsumers(context, message);
  579                       return;
  580                   }
  581               }
  582               msgContext = context.getMessageEvaluationContext();
  583               msgContext.setDestination(destination);
  584               msgContext.setMessageReference(message);
  585               if (!dispatchPolicy.dispatch(message, msgContext, consumers)) {
  586                   onMessageWithNoConsumers(context, message);
  587               }
  588   
  589           } finally {
  590               dispatchValve.decrement();
  591               if (msgContext != null) {
  592                   msgContext.clear();
  593               }
  594           }
  595       }
  596   
  597       public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
  598           broker.messageExpired(context, reference);
  599           // AMQ-2586: Better to leave this stat at zero than to give the user misleading metrics.
  600           // destinationStatistics.getMessages().decrement();
  601           destinationStatistics.getEnqueues().decrement();
  602           destinationStatistics.getExpired().increment();
  603           MessageAck ack = new MessageAck();
  604           ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
  605           ack.setDestination(destination);
  606           ack.setMessageID(reference.getMessageId());
  607           try {
  608               acknowledge(context, subs, ack, reference);
  609           } catch (IOException e) {
  610               LOG.error("Failed to remove expired Message from the store ", e);
  611           }
  612       }
  613   
  614       private final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException {
  615           long start = System.currentTimeMillis();
  616           long nextWarn = start + blockedProducerWarningInterval;
  617           while (!usage.waitForSpace(1000)) {
  618               if (context.getStopping().get()) {
  619                   throw new IOException("Connection closed, send aborted.");
  620               }
  621   
  622               long now = System.currentTimeMillis();
  623               if (now >= nextWarn) {
  624                   LOG.info(warning + " (blocking for: " + (now - start) / 1000 + "s)");
  625                   nextWarn = now + blockedProducerWarningInterval;
  626               }
  627           }
  628       }
  629   
  630   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » broker » region » [javadoc | source]