Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq;
   18   
   19   import java.io.IOException;
   20   import java.io.InputStream;
   21   import java.io.OutputStream;
   22   import java.net.URI;
   23   import java.net.URISyntaxException;
   24   import java.util.HashMap;
   25   import java.util.Iterator;
   26   import java.util.Map;
   27   import java.util.concurrent.ConcurrentHashMap;
   28   import java.util.concurrent.CopyOnWriteArrayList;
   29   import java.util.concurrent.CountDownLatch;
   30   import java.util.concurrent.LinkedBlockingQueue;
   31   import java.util.concurrent.ThreadFactory;
   32   import java.util.concurrent.ThreadPoolExecutor;
   33   import java.util.concurrent.TimeUnit;
   34   import java.util.concurrent.atomic.AtomicBoolean;
   35   import java.util.concurrent.atomic.AtomicInteger;
   36   
   37   import javax.jms.Connection;
   38   import javax.jms.ConnectionConsumer;
   39   import javax.jms.ConnectionMetaData;
   40   import javax.jms.DeliveryMode;
   41   import javax.jms.Destination;
   42   import javax.jms.ExceptionListener;
   43   import javax.jms.IllegalStateException;
   44   import javax.jms.JMSException;
   45   import javax.jms.Queue;
   46   import javax.jms.QueueConnection;
   47   import javax.jms.QueueSession;
   48   import javax.jms.ServerSessionPool;
   49   import javax.jms.Session;
   50   import javax.jms.Topic;
   51   import javax.jms.TopicConnection;
   52   import javax.jms.TopicSession;
   53   import javax.jms.XAConnection;
   54   import javax.jms.InvalidDestinationException;
   55   
   56   import org.apache.activemq.blob.BlobTransferPolicy;
   57   import org.apache.activemq.command.ActiveMQDestination;
   58   import org.apache.activemq.command.ActiveMQMessage;
   59   import org.apache.activemq.command.ActiveMQTempDestination;
   60   import org.apache.activemq.command.ActiveMQTempQueue;
   61   import org.apache.activemq.command.ActiveMQTempTopic;
   62   import org.apache.activemq.command.BrokerInfo;
   63   import org.apache.activemq.command.Command;
   64   import org.apache.activemq.command.CommandTypes;
   65   import org.apache.activemq.command.ConnectionControl;
   66   import org.apache.activemq.command.ConnectionError;
   67   import org.apache.activemq.command.ConnectionId;
   68   import org.apache.activemq.command.ConnectionInfo;
   69   import org.apache.activemq.command.ConsumerControl;
   70   import org.apache.activemq.command.ConsumerId;
   71   import org.apache.activemq.command.ConsumerInfo;
   72   import org.apache.activemq.command.ControlCommand;
   73   import org.apache.activemq.command.DestinationInfo;
   74   import org.apache.activemq.command.ExceptionResponse;
   75   import org.apache.activemq.command.Message;
   76   import org.apache.activemq.command.MessageDispatch;
   77   import org.apache.activemq.command.MessageId;
   78   import org.apache.activemq.command.ProducerAck;
   79   import org.apache.activemq.command.ProducerId;
   80   import org.apache.activemq.command.RemoveInfo;
   81   import org.apache.activemq.command.RemoveSubscriptionInfo;
   82   import org.apache.activemq.command.Response;
   83   import org.apache.activemq.command.SessionId;
   84   import org.apache.activemq.command.ShutdownInfo;
   85   import org.apache.activemq.command.WireFormatInfo;
   86   import org.apache.activemq.management.JMSConnectionStatsImpl;
   87   import org.apache.activemq.management.JMSStatsImpl;
   88   import org.apache.activemq.management.StatsCapable;
   89   import org.apache.activemq.management.StatsImpl;
   90   import org.apache.activemq.state.CommandVisitorAdapter;
   91   import org.apache.activemq.thread.TaskRunnerFactory;
   92   import org.apache.activemq.transport.Transport;
   93   import org.apache.activemq.transport.TransportListener;
   94   import org.apache.activemq.util.IdGenerator;
   95   import org.apache.activemq.util.IntrospectionSupport;
   96   import org.apache.activemq.util.JMSExceptionSupport;
   97   import org.apache.activemq.util.LongSequenceGenerator;
   98   import org.apache.activemq.util.ServiceSupport;
   99   import org.apache.activemq.advisory.DestinationSource;
  100   import org.apache.commons.logging.Log;
  101   import org.apache.commons.logging.LogFactory;
  102   
  103   public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
  104   
  105       public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
  106       public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
  107       public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
  108   
  109       private static final Log LOG = LogFactory.getLog(ActiveMQConnection.class);
  110       private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
  111   
  112       public final ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
  113   
  114       protected boolean dispatchAsync=true;
  115       protected boolean alwaysSessionAsync = true;
  116   
  117       private TaskRunnerFactory sessionTaskRunner;
  118       private final ThreadPoolExecutor asyncConnectionThread;
  119   
  120       // Connection state variables
  121       private final ConnectionInfo info;
  122       private ExceptionListener exceptionListener;
  123       private ClientInternalExceptionListener clientInternalExceptionListener;
  124       private boolean clientIDSet;
  125       private boolean isConnectionInfoSentToBroker;
  126       private boolean userSpecifiedClientID;
  127   
  128       // Configuration options variables
  129       private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
  130       private BlobTransferPolicy blobTransferPolicy;
  131       private RedeliveryPolicy redeliveryPolicy;
  132       private MessageTransformer transformer;
  133   
  134       private boolean disableTimeStampsByDefault;
  135       private boolean optimizedMessageDispatch = true;
  136       private boolean copyMessageOnSend = true;
  137       private boolean useCompression;
  138       private boolean objectMessageSerializationDefered;
  139       private boolean useAsyncSend;
  140       private boolean optimizeAcknowledge;
  141       private boolean nestedMapAndListEnabled = true;
  142       private boolean useRetroactiveConsumer;
  143       private boolean exclusiveConsumer;
  144       private boolean alwaysSyncSend;
  145       private int closeTimeout = 15000;
  146       private boolean watchTopicAdvisories = true;
  147       private long warnAboutUnstartedConnectionTimeout = 500L;
  148       private int sendTimeout =0;
  149       private boolean sendAcksAsync=true;
  150   
  151       private final Transport transport;
  152       private final IdGenerator clientIdGenerator;
  153       private final JMSStatsImpl factoryStats;
  154       private final JMSConnectionStatsImpl stats;
  155   
  156       private final AtomicBoolean started = new AtomicBoolean(false);
  157       private final AtomicBoolean closing = new AtomicBoolean(false);
  158       private final AtomicBoolean closed = new AtomicBoolean(false);
  159       private final AtomicBoolean transportFailed = new AtomicBoolean(false);
  160       private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
  161       private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
  162       private final CopyOnWriteArrayList<ActiveMQInputStream> inputStreams = new CopyOnWriteArrayList<ActiveMQInputStream>();
  163       private final CopyOnWriteArrayList<ActiveMQOutputStream> outputStreams = new CopyOnWriteArrayList<ActiveMQOutputStream>();
  164       private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
  165   
  166       // Maps ConsumerIds to ActiveMQConsumer objects
  167       private final ConcurrentHashMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
  168       private final ConcurrentHashMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
  169       private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
  170       private final SessionId connectionSessionId;
  171       private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
  172       private final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
  173       private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
  174       private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
  175   
  176       private AdvisoryConsumer advisoryConsumer;
  177       private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
  178       private BrokerInfo brokerInfo;
  179       private IOException firstFailureError;
  180       private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
  181   
  182       // Assume that protocol is the latest. Change to the actual protocol
  183       // version when a WireFormatInfo is received.
  184       private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
  185       private long timeCreated;
  186       private ConnectionAudit connectionAudit = new ConnectionAudit();
  187       private DestinationSource destinationSource;
  188       private final Object ensureConnectionInfoSentMutex = new Object();
  189       private boolean useDedicatedTaskRunner;
  190       protected CountDownLatch transportInterruptionProcessingComplete;
  191       private long consumerFailoverRedeliveryWaitPeriod;
  192   
  193       /**
  194        * Construct an <code>ActiveMQConnection</code>
  195        * 
  196        * @param transport
  197        * @param factoryStats
  198        * @throws Exception
  199        */
  200       protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
  201   
  202           this.transport = transport;
  203           this.clientIdGenerator = clientIdGenerator;
  204           this.factoryStats = factoryStats;
  205   
  206           // Configure a single threaded executor who's core thread can timeout if
  207           // idle
  208           asyncConnectionThread = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
  209               public Thread newThread(Runnable r) {
  210                   Thread thread = new Thread(r, "ActiveMQ Connection Worker: " + transport);
  211                   thread.setDaemon(true);
  212                   return thread;
  213               }
  214           });
  215           // asyncConnectionThread.allowCoreThreadTimeOut(true);
  216   
  217           this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId()));
  218           this.info.setManageable(true);
  219           this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
  220   
  221           this.transport.setTransportListener(this);
  222   
  223           this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
  224           this.factoryStats.addConnection(this);
  225           this.timeCreated = System.currentTimeMillis();
  226           this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
  227       }
  228   
  229       protected void setUserName(String userName) {
  230           this.info.setUserName(userName);
  231       }
  232   
  233       protected void setPassword(String password) {
  234           this.info.setPassword(password);
  235       }
  236   
  237       /**
  238        * A static helper method to create a new connection
  239        * 
  240        * @return an ActiveMQConnection
  241        * @throws JMSException
  242        */
  243       public static ActiveMQConnection makeConnection() throws JMSException {
  244           ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
  245           return (ActiveMQConnection)factory.createConnection();
  246       }
  247   
  248       /**
  249        * A static helper method to create a new connection
  250        * 
  251        * @param uri
  252        * @return and ActiveMQConnection
  253        * @throws JMSException
  254        */
  255       public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
  256           ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
  257           return (ActiveMQConnection)factory.createConnection();
  258       }
  259   
  260       /**
  261        * A static helper method to create a new connection
  262        * 
  263        * @param user
  264        * @param password
  265        * @param uri
  266        * @return an ActiveMQConnection
  267        * @throws JMSException
  268        */
  269       public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
  270           ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
  271           return (ActiveMQConnection)factory.createConnection();
  272       }
  273   
  274       /**
  275        * @return a number unique for this connection
  276        */
  277       public JMSConnectionStatsImpl getConnectionStats() {
  278           return stats;
  279       }
  280   
  281       /**
  282        * Creates a <CODE>Session</CODE> object.
  283        * 
  284        * @param transacted indicates whether the session is transacted
  285        * @param acknowledgeMode indicates whether the consumer or the client will
  286        *                acknowledge any messages it receives; ignored if the
  287        *                session is transacted. Legal values are
  288        *                <code>Session.AUTO_ACKNOWLEDGE</code>,
  289        *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
  290        *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
  291        * @return a newly created session
  292        * @throws JMSException if the <CODE>Connection</CODE> object fails to
  293        *                 create a session due to some internal error or lack of
  294        *                 support for the specific transaction and acknowledgement
  295        *                 mode.
  296        * @see Session#AUTO_ACKNOWLEDGE
  297        * @see Session#CLIENT_ACKNOWLEDGE
  298        * @see Session#DUPS_OK_ACKNOWLEDGE
  299        * @since 1.1
  300        */
  301       public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
  302           checkClosedOrFailed();
  303           ensureConnectionInfoSent();
  304           if(!transacted) {
  305               if (acknowledgeMode==Session.SESSION_TRANSACTED) {
  306                   throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
  307               } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
  308                   throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
  309                           "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
  310               }
  311           }
  312           return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
  313               ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
  314       }
  315   
  316       /**
  317        * @return sessionId
  318        */
  319       protected SessionId getNextSessionId() {
  320           return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
  321       }
  322   
  323       /**
  324        * Gets the client identifier for this connection.
  325        * <P>
  326        * This value is specific to the JMS provider. It is either preconfigured by
  327        * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
  328        * dynamically by the application by calling the <code>setClientID</code>
  329        * method.
  330        * 
  331        * @return the unique client identifier
  332        * @throws JMSException if the JMS provider fails to return the client ID
  333        *                 for this connection due to some internal error.
  334        */
  335       public String getClientID() throws JMSException {
  336           checkClosedOrFailed();
  337           return this.info.getClientId();
  338       }
  339   
  340       /**
  341        * Sets the client identifier for this connection.
  342        * <P>
  343        * The preferred way to assign a JMS client's client identifier is for it to
  344        * be configured in a client-specific <CODE>ConnectionFactory</CODE>
  345        * object and transparently assigned to the <CODE>Connection</CODE> object
  346        * it creates.
  347        * <P>
  348        * Alternatively, a client can set a connection's client identifier using a
  349        * provider-specific value. The facility to set a connection's client
  350        * identifier explicitly is not a mechanism for overriding the identifier
  351        * that has been administratively configured. It is provided for the case
  352        * where no administratively specified identifier exists. If one does exist,
  353        * an attempt to change it by setting it must throw an
  354        * <CODE>IllegalStateException</CODE>. If a client sets the client
  355        * identifier explicitly, it must do so immediately after it creates the
  356        * connection and before any other action on the connection is taken. After
  357        * this point, setting the client identifier is a programming error that
  358        * should throw an <CODE>IllegalStateException</CODE>.
  359        * <P>
  360        * The purpose of the client identifier is to associate a connection and its
  361        * objects with a state maintained on behalf of the client by a provider.
  362        * The only such state identified by the JMS API is that required to support
  363        * durable subscriptions.
  364        * <P>
  365        * If another connection with the same <code>clientID</code> is already
  366        * running when this method is called, the JMS provider should detect the
  367        * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
  368        * 
  369        * @param newClientID the unique client identifier
  370        * @throws JMSException if the JMS provider fails to set the client ID for
  371        *                 this connection due to some internal error.
  372        * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
  373        *                 invalid or duplicate client ID.
  374        * @throws javax.jms.IllegalStateException if the JMS client attempts to set
  375        *                 a connection's client ID at the wrong time or when it has
  376        *                 been administratively configured.
  377        */
  378       public void setClientID(String newClientID) throws JMSException {
  379           checkClosedOrFailed();
  380   
  381           if (this.clientIDSet) {
  382               throw new IllegalStateException("The clientID has already been set");
  383           }
  384   
  385           if (this.isConnectionInfoSentToBroker) {
  386               throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
  387           }
  388   
  389           this.info.setClientId(newClientID);
  390           this.userSpecifiedClientID = true;
  391           ensureConnectionInfoSent();
  392       }
  393   
  394       /**
  395        * Sets the default client id that the connection will use if explicitly not
  396        * set with the setClientId() call.
  397        */
  398       public void setDefaultClientID(String clientID) throws JMSException {
  399           this.info.setClientId(clientID);
  400           this.userSpecifiedClientID = true;
  401       }
  402   
  403       /**
  404        * Gets the metadata for this connection.
  405        * 
  406        * @return the connection metadata
  407        * @throws JMSException if the JMS provider fails to get the connection
  408        *                 metadata for this connection.
  409        * @see javax.jms.ConnectionMetaData
  410        */
  411       public ConnectionMetaData getMetaData() throws JMSException {
  412           checkClosedOrFailed();
  413           return ActiveMQConnectionMetaData.INSTANCE;
  414       }
  415   
  416       /**
  417        * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
  418        * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
  419        * associated with it.
  420        * 
  421        * @return the <CODE>ExceptionListener</CODE> for this connection, or
  422        *         null, if no <CODE>ExceptionListener</CODE> is associated with
  423        *         this connection.
  424        * @throws JMSException if the JMS provider fails to get the
  425        *                 <CODE>ExceptionListener</CODE> for this connection.
  426        * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
  427        */
  428       public ExceptionListener getExceptionListener() throws JMSException {
  429           checkClosedOrFailed();
  430           return this.exceptionListener;
  431       }
  432   
  433       /**
  434        * Sets an exception listener for this connection.
  435        * <P>
  436        * If a JMS provider detects a serious problem with a connection, it informs
  437        * the connection's <CODE> ExceptionListener</CODE>, if one has been
  438        * registered. It does this by calling the listener's <CODE>onException
  439        * </CODE>
  440        * method, passing it a <CODE>JMSException</CODE> object describing the
  441        * problem.
  442        * <P>
  443        * An exception listener allows a client to be notified of a problem
  444        * asynchronously. Some connections only consume messages, so they would
  445        * have no other way to learn their connection has failed.
  446        * <P>
  447        * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
  448        * <P>
  449        * A JMS provider should attempt to resolve connection problems itself
  450        * before it notifies the client of them.
  451        * 
  452        * @param listener the exception listener
  453        * @throws JMSException if the JMS provider fails to set the exception
  454        *                 listener for this connection.
  455        */
  456       public void setExceptionListener(ExceptionListener listener) throws JMSException {
  457           checkClosedOrFailed();
  458           this.exceptionListener = listener;
  459       }
  460   
  461       /**
  462        * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
  463        * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
  464        * associated with it.
  465        * 
  466        * @return the listener or <code>null</code> if no listener is registered with the connection.
  467        */
  468       public ClientInternalExceptionListener getClientInternalExceptionListener()
  469       {
  470           return clientInternalExceptionListener;
  471       }
  472   
  473       /**
  474        * Sets a client internal exception listener for this connection.
  475        * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
  476        * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
  477        * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
  478        * describing the problem.
  479        * 
  480        * @param listener the exception listener
  481        */
  482       public void setClientInternalExceptionListener(ClientInternalExceptionListener listener)
  483       {
  484           this.clientInternalExceptionListener = listener;
  485       }
  486       
  487       /**
  488        * Starts (or restarts) a connection's delivery of incoming messages. A call
  489        * to <CODE>start</CODE> on a connection that has already been started is
  490        * ignored.
  491        * 
  492        * @throws JMSException if the JMS provider fails to start message delivery
  493        *                 due to some internal error.
  494        * @see javax.jms.Connection#stop()
  495        */
  496       public void start() throws JMSException {
  497           checkClosedOrFailed();
  498           ensureConnectionInfoSent();
  499           if (started.compareAndSet(false, true)) {
  500               for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
  501                   ActiveMQSession session = i.next();
  502                   session.start();
  503               }
  504           }
  505       }
  506   
  507       /**
  508        * Temporarily stops a connection's delivery of incoming messages. Delivery
  509        * can be restarted using the connection's <CODE>start</CODE> method. When
  510        * the connection is stopped, delivery to all the connection's message
  511        * consumers is inhibited: synchronous receives block, and messages are not
  512        * delivered to message listeners.
  513        * <P>
  514        * This call blocks until receives and/or message listeners in progress have
  515        * completed.
  516        * <P>
  517        * Stopping a connection has no effect on its ability to send messages. A
  518        * call to <CODE>stop</CODE> on a connection that has already been stopped
  519        * is ignored.
  520        * <P>
  521        * A call to <CODE>stop</CODE> must not return until delivery of messages
  522        * has paused. This means that a client can rely on the fact that none of
  523        * its message listeners will be called and that all threads of control
  524        * waiting for <CODE>receive</CODE> calls to return will not return with a
  525        * message until the connection is restarted. The receive timers for a
  526        * stopped connection continue to advance, so receives may time out while
  527        * the connection is stopped.
  528        * <P>
  529        * If message listeners are running when <CODE>stop</CODE> is invoked, the
  530        * <CODE>stop</CODE> call must wait until all of them have returned before
  531        * it may return. While these message listeners are completing, they must
  532        * have the full services of the connection available to them.
  533        * 
  534        * @throws JMSException if the JMS provider fails to stop message delivery
  535        *                 due to some internal error.
  536        * @see javax.jms.Connection#start()
  537        */
  538       public void stop() throws JMSException {
  539           checkClosedOrFailed();
  540           if (started.compareAndSet(true, false)) {
  541               synchronized(sessions) {
  542                   for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
  543                       ActiveMQSession s = i.next();
  544                       s.stop();
  545                   }
  546               }
  547           }
  548       }
  549   
  550       /**
  551        * Closes the connection.
  552        * <P>
  553        * Since a provider typically allocates significant resources outside the
  554        * JVM on behalf of a connection, clients should close these resources when
  555        * they are not needed. Relying on garbage collection to eventually reclaim
  556        * these resources may not be timely enough.
  557        * <P>
  558        * There is no need to close the sessions, producers, and consumers of a
  559        * closed connection.
  560        * <P>
  561        * Closing a connection causes all temporary destinations to be deleted.
  562        * <P>
  563        * When this method is invoked, it should not return until message
  564        * processing has been shut down in an orderly fashion. This means that all
  565        * message listeners that may have been running have returned, and that all
  566        * pending receives have returned. A close terminates all pending message
  567        * receives on the connection's sessions' consumers. The receives may return
  568        * with a message or with null, depending on whether there was a message
  569        * available at the time of the close. If one or more of the connection's
  570        * sessions' message listeners is processing a message at the time when
  571        * connection <CODE>close</CODE> is invoked, all the facilities of the
  572        * connection and its sessions must remain available to those listeners
  573        * until they return control to the JMS provider.
  574        * <P>
  575        * Closing a connection causes any of its sessions' transactions in progress
  576        * to be rolled back. In the case where a session's work is coordinated by
  577        * an external transaction manager, a session's <CODE>commit</CODE> and
  578        * <CODE> rollback</CODE> methods are not used and the result of a closed
  579        * session's work is determined later by the transaction manager. Closing a
  580        * connection does NOT force an acknowledgment of client-acknowledged
  581        * sessions.
  582        * <P>
  583        * Invoking the <CODE>acknowledge</CODE> method of a received message from
  584        * a closed connection's session must throw an
  585        * <CODE>IllegalStateException</CODE>. Closing a closed connection must
  586        * NOT throw an exception.
  587        * 
  588        * @throws JMSException if the JMS provider fails to close the connection
  589        *                 due to some internal error. For example, a failure to
  590        *                 release resources or to close a socket connection can
  591        *                 cause this exception to be thrown.
  592        */
  593       public void close() throws JMSException {
  594           try {
  595               // If we were running, lets stop first.
  596               if (!closed.get() && !transportFailed.get()) {
  597                   stop();
  598               }
  599   
  600               synchronized (this) {
  601                   if (!closed.get()) {
  602                       closing.set(true);
  603   
  604                       if (destinationSource != null) {
  605                           destinationSource.stop();
  606                           destinationSource = null;
  607                       }
  608                       if (advisoryConsumer != null) {
  609                           advisoryConsumer.dispose();
  610                           advisoryConsumer = null;
  611                       }
  612   
  613                       long lastDeliveredSequenceId = 0;
  614                       for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
  615                           ActiveMQSession s = i.next();
  616                           s.dispose();
  617                           lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
  618                       }
  619                       for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
  620                           ActiveMQConnectionConsumer c = i.next();
  621                           c.dispose();
  622                       }
  623                       for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
  624                           ActiveMQInputStream c = i.next();
  625                           c.dispose();
  626                       }
  627                       for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
  628                           ActiveMQOutputStream c = i.next();
  629                           c.dispose();
  630                       }
  631   
  632                       if (isConnectionInfoSentToBroker) {
  633                           // If we announced ourselfs to the broker.. Try to let
  634                           // the broker
  635                           // know that the connection is being shutdown.
  636                           RemoveInfo removeCommand = info.createRemoveCommand();
  637                           removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
  638                           doSyncSendPacket(info.createRemoveCommand(), closeTimeout);
  639                           doAsyncSendPacket(new ShutdownInfo());
  640                       }
  641   
  642                       ServiceSupport.dispose(this.transport);
  643   
  644                       started.set(false);
  645   
  646                       // TODO if we move the TaskRunnerFactory to the connection
  647                       // factory
  648                       // then we may need to call
  649                       // factory.onConnectionClose(this);
  650                       if (sessionTaskRunner != null) {
  651                           sessionTaskRunner.shutdown();
  652                       }
  653                       closed.set(true);
  654                       closing.set(false);
  655                   }
  656               }
  657           } finally {
  658               try {
  659                   if (asyncConnectionThread != null){
  660                       asyncConnectionThread.shutdown();
  661                   }
  662               }catch(Throwable e) {
  663                   LOG.error("Error shutting down thread pool " + e,e);
  664               }
  665               factoryStats.removeConnection(this);
  666           }
  667       }
  668   
  669       /**
  670        * Tells the broker to terminate its VM. This can be used to cleanly
  671        * terminate a broker running in a standalone java process. Server must have
  672        * property enable.vm.shutdown=true defined to allow this to work.
  673        */
  674       // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
  675       // implemented.
  676       /*
  677        * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
  678        * command = new BrokerAdminCommand();
  679        * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
  680        * asyncSendPacket(command); }
  681        */
  682   
  683       /**
  684        * Create a durable connection consumer for this connection (optional
  685        * operation). This is an expert facility not used by regular JMS clients.
  686        * 
  687        * @param topic topic to access
  688        * @param subscriptionName durable subscription name
  689        * @param messageSelector only messages with properties matching the message
  690        *                selector expression are delivered. A value of null or an
  691        *                empty string indicates that there is no message selector
  692        *                for the message consumer.
  693        * @param sessionPool the server session pool to associate with this durable
  694        *                connection consumer
  695        * @param maxMessages the maximum number of messages that can be assigned to
  696        *                a server session at one time
  697        * @return the durable connection consumer
  698        * @throws JMSException if the <CODE>Connection</CODE> object fails to
  699        *                 create a connection consumer due to some internal error
  700        *                 or invalid arguments for <CODE>sessionPool</CODE> and
  701        *                 <CODE>messageSelector</CODE>.
  702        * @throws javax.jms.InvalidDestinationException if an invalid destination
  703        *                 is specified.
  704        * @throws javax.jms.InvalidSelectorException if the message selector is
  705        *                 invalid.
  706        * @see javax.jms.ConnectionConsumer
  707        * @since 1.1
  708        */
  709       public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
  710           throws JMSException {
  711           return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
  712       }
  713   
  714       /**
  715        * Create a durable connection consumer for this connection (optional
  716        * operation). This is an expert facility not used by regular JMS clients.
  717        * 
  718        * @param topic topic to access
  719        * @param subscriptionName durable subscription name
  720        * @param messageSelector only messages with properties matching the message
  721        *                selector expression are delivered. A value of null or an
  722        *                empty string indicates that there is no message selector
  723        *                for the message consumer.
  724        * @param sessionPool the server session pool to associate with this durable
  725        *                connection consumer
  726        * @param maxMessages the maximum number of messages that can be assigned to
  727        *                a server session at one time
  728        * @param noLocal set true if you want to filter out messages published
  729        *                locally
  730        * @return the durable connection consumer
  731        * @throws JMSException if the <CODE>Connection</CODE> object fails to
  732        *                 create a connection consumer due to some internal error
  733        *                 or invalid arguments for <CODE>sessionPool</CODE> and
  734        *                 <CODE>messageSelector</CODE>.
  735        * @throws javax.jms.InvalidDestinationException if an invalid destination
  736        *                 is specified.
  737        * @throws javax.jms.InvalidSelectorException if the message selector is
  738        *                 invalid.
  739        * @see javax.jms.ConnectionConsumer
  740        * @since 1.1
  741        */
  742       public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
  743                                                                 boolean noLocal) throws JMSException {
  744           checkClosedOrFailed();
  745           ensureConnectionInfoSent();
  746           SessionId sessionId = new SessionId(info.getConnectionId(), -1);
  747           ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
  748           info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
  749           info.setSubscriptionName(subscriptionName);
  750           info.setSelector(messageSelector);
  751           info.setPrefetchSize(maxMessages);
  752           info.setDispatchAsync(isDispatchAsync());
  753   
  754           // Allows the options on the destination to configure the consumerInfo
  755           if (info.getDestination().getOptions() != null) {
  756               Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
  757               IntrospectionSupport.setProperties(this.info, options, "consumer.");
  758           }
  759   
  760           return new ActiveMQConnectionConsumer(this, sessionPool, info);
  761       }
  762   
  763       // Properties
  764       // -------------------------------------------------------------------------
  765   
  766       /**
  767        * Returns true if this connection has been started
  768        * 
  769        * @return true if this Connection is started
  770        */
  771       public boolean isStarted() {
  772           return started.get();
  773       }
  774   
  775       /**
  776        * Returns true if the connection is closed
  777        */
  778       public boolean isClosed() {
  779           return closed.get();
  780       }
  781   
  782       /**
  783        * Returns true if the connection is in the process of being closed
  784        */
  785       public boolean isClosing() {
  786           return closing.get();
  787       }
  788   
  789       /**
  790        * Returns true if the underlying transport has failed
  791        */
  792       public boolean isTransportFailed() {
  793           return transportFailed.get();
  794       }
  795   
  796       /**
  797        * @return Returns the prefetchPolicy.
  798        */
  799       public ActiveMQPrefetchPolicy getPrefetchPolicy() {
  800           return prefetchPolicy;
  801       }
  802   
  803       /**
  804        * Sets the <a
  805        * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
  806        * policy</a> for consumers created by this connection.
  807        */
  808       public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
  809           this.prefetchPolicy = prefetchPolicy;
  810       }
  811   
  812       /**
  813        */
  814       public Transport getTransportChannel() {
  815           return transport;
  816       }
  817   
  818       /**
  819        * @return Returns the clientID of the connection, forcing one to be
  820        *         generated if one has not yet been configured.
  821        */
  822       public String getInitializedClientID() throws JMSException {
  823           ensureConnectionInfoSent();
  824           return info.getClientId();
  825       }
  826   
  827       /**
  828        * @return Returns the timeStampsDisableByDefault.
  829        */
  830       public boolean isDisableTimeStampsByDefault() {
  831           return disableTimeStampsByDefault;
  832       }
  833   
  834       /**
  835        * Sets whether or not timestamps on messages should be disabled or not. If
  836        * you disable them it adds a small performance boost.
  837        */
  838       public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
  839           this.disableTimeStampsByDefault = timeStampsDisableByDefault;
  840       }
  841   
  842       /**
  843        * @return Returns the dispatchOptimizedMessage.
  844        */
  845       public boolean isOptimizedMessageDispatch() {
  846           return optimizedMessageDispatch;
  847       }
  848   
  849       /**
  850        * If this flag is set then an larger prefetch limit is used - only
  851        * applicable for durable topic subscribers.
  852        */
  853       public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
  854           this.optimizedMessageDispatch = dispatchOptimizedMessage;
  855       }
  856   
  857       /**
  858        * @return Returns the closeTimeout.
  859        */
  860       public int getCloseTimeout() {
  861           return closeTimeout;
  862       }
  863   
  864       /**
  865        * Sets the timeout before a close is considered complete. Normally a
  866        * close() on a connection waits for confirmation from the broker; this
  867        * allows that operation to timeout to save the client hanging if there is
  868        * no broker
  869        */
  870       public void setCloseTimeout(int closeTimeout) {
  871           this.closeTimeout = closeTimeout;
  872       }
  873   
  874       /**
  875        * @return ConnectionInfo
  876        */
  877       public ConnectionInfo getConnectionInfo() {
  878           return this.info;
  879       }
  880   
  881       public boolean isUseRetroactiveConsumer() {
  882           return useRetroactiveConsumer;
  883       }
  884   
  885       /**
  886        * Sets whether or not retroactive consumers are enabled. Retroactive
  887        * consumers allow non-durable topic subscribers to receive old messages
  888        * that were published before the non-durable subscriber started.
  889        */
  890       public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
  891           this.useRetroactiveConsumer = useRetroactiveConsumer;
  892       }
  893   
  894       public boolean isNestedMapAndListEnabled() {
  895           return nestedMapAndListEnabled;
  896       }
  897   
  898       /**
  899        * Enables/disables whether or not Message properties and MapMessage entries
  900        * support <a
  901        * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
  902        * Structures</a> of Map and List objects
  903        */
  904       public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
  905           this.nestedMapAndListEnabled = structuredMapsEnabled;
  906       }
  907   
  908       public boolean isExclusiveConsumer() {
  909           return exclusiveConsumer;
  910       }
  911   
  912       /**
  913        * Enables or disables whether or not queue consumers should be exclusive or
  914        * not for example to preserve ordering when not using <a
  915        * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
  916        * 
  917        * @param exclusiveConsumer
  918        */
  919       public void setExclusiveConsumer(boolean exclusiveConsumer) {
  920           this.exclusiveConsumer = exclusiveConsumer;
  921       }
  922   
  923       /**
  924        * Adds a transport listener so that a client can be notified of events in
  925        * the underlying transport
  926        */
  927       public void addTransportListener(TransportListener transportListener) {
  928           transportListeners.add(transportListener);
  929       }
  930   
  931       public void removeTransportListener(TransportListener transportListener) {
  932           transportListeners.remove(transportListener);
  933       }
  934   
  935       public boolean isUseDedicatedTaskRunner() {
  936           return useDedicatedTaskRunner;
  937       }
  938       
  939       public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
  940           this.useDedicatedTaskRunner = useDedicatedTaskRunner;
  941       }
  942   
  943       public TaskRunnerFactory getSessionTaskRunner() {
  944           synchronized (this) {
  945               if (sessionTaskRunner == null) {
  946                   sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner());
  947               }
  948           }
  949           return sessionTaskRunner;
  950       }
  951   
  952       public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
  953           this.sessionTaskRunner = sessionTaskRunner;
  954       }
  955   
  956       public MessageTransformer getTransformer() {
  957           return transformer;
  958       }
  959   
  960       /**
  961        * Sets the transformer used to transform messages before they are sent on
  962        * to the JMS bus or when they are received from the bus but before they are
  963        * delivered to the JMS client
  964        */
  965       public void setTransformer(MessageTransformer transformer) {
  966           this.transformer = transformer;
  967       }
  968   
  969       /**
  970        * @return the statsEnabled
  971        */
  972       public boolean isStatsEnabled() {
  973           return this.stats.isEnabled();
  974       }
  975   
  976       /**
  977        * @param statsEnabled the statsEnabled to set
  978        */
  979       public void setStatsEnabled(boolean statsEnabled) {
  980           this.stats.setEnabled(statsEnabled);
  981       }
  982   
  983       /**
  984        * Returns the {@link DestinationSource} object which can be used to listen to destinations
  985        * being created or destroyed or to enquire about the current destinations available on the broker
  986        *
  987        * @return a lazily created destination source
  988        * @throws JMSException
  989        */
  990       public DestinationSource getDestinationSource() throws JMSException {
  991           if (destinationSource == null) {
  992               destinationSource = new DestinationSource(this);
  993               destinationSource.start();
  994           }
  995           return destinationSource;
  996       }
  997   
  998       // Implementation methods
  999       // -------------------------------------------------------------------------
 1000   
 1001       /**
 1002        * Used internally for adding Sessions to the Connection
 1003        * 
 1004        * @param session
 1005        * @throws JMSException
 1006        * @throws JMSException
 1007        */
 1008       protected void addSession(ActiveMQSession session) throws JMSException {
 1009           this.sessions.add(session);
 1010           if (sessions.size() > 1 || session.isTransacted()) {
 1011               optimizedMessageDispatch = false;
 1012           }
 1013       }
 1014   
 1015       /**
 1016        * Used interanlly for removing Sessions from a Connection
 1017        * 
 1018        * @param session
 1019        */
 1020       protected void removeSession(ActiveMQSession session) {
 1021           this.sessions.remove(session);
 1022           this.removeDispatcher(session);
 1023       }
 1024   
 1025       /**
 1026        * Add a ConnectionConsumer
 1027        * 
 1028        * @param connectionConsumer
 1029        * @throws JMSException
 1030        */
 1031       protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
 1032           this.connectionConsumers.add(connectionConsumer);
 1033       }
 1034   
 1035       /**
 1036        * Remove a ConnectionConsumer
 1037        * 
 1038        * @param connectionConsumer
 1039        */
 1040       protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
 1041           this.connectionConsumers.remove(connectionConsumer);
 1042           this.removeDispatcher(connectionConsumer);
 1043       }
 1044   
 1045       /**
 1046        * Creates a <CODE>TopicSession</CODE> object.
 1047        * 
 1048        * @param transacted indicates whether the session is transacted
 1049        * @param acknowledgeMode indicates whether the consumer or the client will
 1050        *                acknowledge any messages it receives; ignored if the
 1051        *                session is transacted. Legal values are
 1052        *                <code>Session.AUTO_ACKNOWLEDGE</code>,
 1053        *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
 1054        *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
 1055        * @return a newly created topic session
 1056        * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
 1057        *                 to create a session due to some internal error or lack of
 1058        *                 support for the specific transaction and acknowledgement
 1059        *                 mode.
 1060        * @see Session#AUTO_ACKNOWLEDGE
 1061        * @see Session#CLIENT_ACKNOWLEDGE
 1062        * @see Session#DUPS_OK_ACKNOWLEDGE
 1063        */
 1064       public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
 1065           return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
 1066       }
 1067   
 1068       /**
 1069        * Creates a connection consumer for this connection (optional operation).
 1070        * This is an expert facility not used by regular JMS clients.
 1071        * 
 1072        * @param topic the topic to access
 1073        * @param messageSelector only messages with properties matching the message
 1074        *                selector expression are delivered. A value of null or an
 1075        *                empty string indicates that there is no message selector
 1076        *                for the message consumer.
 1077        * @param sessionPool the server session pool to associate with this
 1078        *                connection consumer
 1079        * @param maxMessages the maximum number of messages that can be assigned to
 1080        *                a server session at one time
 1081        * @return the connection consumer
 1082        * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
 1083        *                 to create a connection consumer due to some internal
 1084        *                 error or invalid arguments for <CODE>sessionPool</CODE>
 1085        *                 and <CODE>messageSelector</CODE>.
 1086        * @throws javax.jms.InvalidDestinationException if an invalid topic is
 1087        *                 specified.
 1088        * @throws javax.jms.InvalidSelectorException if the message selector is
 1089        *                 invalid.
 1090        * @see javax.jms.ConnectionConsumer
 1091        */
 1092       public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
 1093           return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
 1094       }
 1095   
 1096       /**
 1097        * Creates a connection consumer for this connection (optional operation).
 1098        * This is an expert facility not used by regular JMS clients.
 1099        * 
 1100        * @param queue the queue to access
 1101        * @param messageSelector only messages with properties matching the message
 1102        *                selector expression are delivered. A value of null or an
 1103        *                empty string indicates that there is no message selector
 1104        *                for the message consumer.
 1105        * @param sessionPool the server session pool to associate with this
 1106        *                connection consumer
 1107        * @param maxMessages the maximum number of messages that can be assigned to
 1108        *                a server session at one time
 1109        * @return the connection consumer
 1110        * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
 1111        *                 to create a connection consumer due to some internal
 1112        *                 error or invalid arguments for <CODE>sessionPool</CODE>
 1113        *                 and <CODE>messageSelector</CODE>.
 1114        * @throws javax.jms.InvalidDestinationException if an invalid queue is
 1115        *                 specified.
 1116        * @throws javax.jms.InvalidSelectorException if the message selector is
 1117        *                 invalid.
 1118        * @see javax.jms.ConnectionConsumer
 1119        */
 1120       public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
 1121           return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
 1122       }
 1123   
 1124       /**
 1125        * Creates a connection consumer for this connection (optional operation).
 1126        * This is an expert facility not used by regular JMS clients.
 1127        * 
 1128        * @param destination the destination to access
 1129        * @param messageSelector only messages with properties matching the message
 1130        *                selector expression are delivered. A value of null or an
 1131        *                empty string indicates that there is no message selector
 1132        *                for the message consumer.
 1133        * @param sessionPool the server session pool to associate with this
 1134        *                connection consumer
 1135        * @param maxMessages the maximum number of messages that can be assigned to
 1136        *                a server session at one time
 1137        * @return the connection consumer
 1138        * @throws JMSException if the <CODE>Connection</CODE> object fails to
 1139        *                 create a connection consumer due to some internal error
 1140        *                 or invalid arguments for <CODE>sessionPool</CODE> and
 1141        *                 <CODE>messageSelector</CODE>.
 1142        * @throws javax.jms.InvalidDestinationException if an invalid destination
 1143        *                 is specified.
 1144        * @throws javax.jms.InvalidSelectorException if the message selector is
 1145        *                 invalid.
 1146        * @see javax.jms.ConnectionConsumer
 1147        * @since 1.1
 1148        */
 1149       public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
 1150           return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
 1151       }
 1152   
 1153       public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
 1154           throws JMSException {
 1155   
 1156           checkClosedOrFailed();
 1157           ensureConnectionInfoSent();
 1158   
 1159           ConsumerId consumerId = createConsumerId();
 1160           ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
 1161           consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
 1162           consumerInfo.setSelector(messageSelector);
 1163           consumerInfo.setPrefetchSize(maxMessages);
 1164           consumerInfo.setNoLocal(noLocal);
 1165           consumerInfo.setDispatchAsync(isDispatchAsync());
 1166   
 1167           // Allows the options on the destination to configure the consumerInfo
 1168           if (consumerInfo.getDestination().getOptions() != null) {
 1169               Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
 1170               IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
 1171           }
 1172   
 1173           return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
 1174       }
 1175   
 1176       /**
 1177        * @return
 1178        */
 1179       private ConsumerId createConsumerId() {
 1180           return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
 1181       }
 1182   
 1183       /**
 1184        * @return
 1185        */
 1186       private ProducerId createProducerId() {
 1187           return new ProducerId(connectionSessionId, producerIdGenerator.getNextSequenceId());
 1188       }
 1189   
 1190       /**
 1191        * Creates a <CODE>QueueSession</CODE> object.
 1192        * 
 1193        * @param transacted indicates whether the session is transacted
 1194        * @param acknowledgeMode indicates whether the consumer or the client will
 1195        *                acknowledge any messages it receives; ignored if the
 1196        *                session is transacted. Legal values are
 1197        *                <code>Session.AUTO_ACKNOWLEDGE</code>,
 1198        *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
 1199        *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
 1200        * @return a newly created queue session
 1201        * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
 1202        *                 to create a session due to some internal error or lack of
 1203        *                 support for the specific transaction and acknowledgement
 1204        *                 mode.
 1205        * @see Session#AUTO_ACKNOWLEDGE
 1206        * @see Session#CLIENT_ACKNOWLEDGE
 1207        * @see Session#DUPS_OK_ACKNOWLEDGE
 1208        */
 1209       public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
 1210           return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
 1211       }
 1212   
 1213       /**
 1214        * Ensures that the clientID was manually specified and not auto-generated.
 1215        * If the clientID was not specified this method will throw an exception.
 1216        * This method is used to ensure that the clientID + durableSubscriber name
 1217        * are used correctly.
 1218        * 
 1219        * @throws JMSException
 1220        */
 1221       public void checkClientIDWasManuallySpecified() throws JMSException {
 1222           if (!userSpecifiedClientID) {
 1223               throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
 1224           }
 1225       }
 1226   
 1227       /**
 1228        * send a Packet through the Connection - for internal use only
 1229        * 
 1230        * @param command
 1231        * @throws JMSException
 1232        */
 1233       public void asyncSendPacket(Command command) throws JMSException {
 1234           if (isClosed()) {
 1235               throw new ConnectionClosedException();
 1236           } else {
 1237               doAsyncSendPacket(command);
 1238           }
 1239       }
 1240   
 1241   	private void doAsyncSendPacket(Command command) throws JMSException {
 1242   		try {
 1243   		    this.transport.oneway(command);
 1244   		} catch (IOException e) {
 1245   		    throw JMSExceptionSupport.create(e);
 1246   		}
 1247   	}
 1248   
 1249       /**
 1250        * Send a packet through a Connection - for internal use only
 1251        * 
 1252        * @param command
 1253        * @return
 1254        * @throws JMSException
 1255        */
 1256       public Response syncSendPacket(Command command) throws JMSException {
 1257           if (isClosed()) {
 1258               throw new ConnectionClosedException();
 1259           } else {
 1260   
 1261               try {
 1262                   Response response = (Response)this.transport.request(command);
 1263                   if (response.isException()) {
 1264                       ExceptionResponse er = (ExceptionResponse)response;
 1265                       if (er.getException() instanceof JMSException) {
 1266                           throw (JMSException)er.getException();
 1267                       } else {
 1268                           if (isClosed()||closing.get()) {
 1269                               LOG.debug("Received an exception but connection is closing");
 1270                           }
 1271                           JMSException jmsEx = null;
 1272                           try {
 1273                            jmsEx = JMSExceptionSupport.create(er.getException());
 1274                           }catch(Throwable e) {
 1275                               LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
 1276                           }
 1277                           if(jmsEx !=null) {
 1278                               throw jmsEx;
 1279                           }
 1280                       }
 1281                   }
 1282                   return response;
 1283               } catch (IOException e) {
 1284                   throw JMSExceptionSupport.create(e);
 1285               }
 1286           }
 1287       }
 1288   
 1289       /**
 1290        * Send a packet through a Connection - for internal use only
 1291        * 
 1292        * @param command
 1293        * @return
 1294        * @throws JMSException
 1295        */
 1296       public Response syncSendPacket(Command command, int timeout) throws JMSException {
 1297           if (isClosed() || closing.get()) {
 1298               throw new ConnectionClosedException();
 1299           } else {
 1300               return doSyncSendPacket(command, timeout);
 1301           }
 1302       }
 1303   
 1304   	private Response doSyncSendPacket(Command command, int timeout)
 1305   			throws JMSException {
 1306   		try {
 1307   		    Response response = (Response)this.transport.request(command, timeout);
 1308   		    if (response != null && response.isException()) {
 1309   		        ExceptionResponse er = (ExceptionResponse)response;
 1310   		        if (er.getException() instanceof JMSException) {
 1311   		            throw (JMSException)er.getException();
 1312   		        } else {
 1313   		            throw JMSExceptionSupport.create(er.getException());
 1314   		        }
 1315   		    }
 1316   		    return response;
 1317   		} catch (IOException e) {
 1318   		    throw JMSExceptionSupport.create(e);
 1319   		}
 1320   	}
 1321   
 1322       /**
 1323        * @return statistics for this Connection
 1324        */
 1325       public StatsImpl getStats() {
 1326           return stats;
 1327       }
 1328   
 1329       /**
 1330        * simply throws an exception if the Connection is already closed or the
 1331        * Transport has failed
 1332        * 
 1333        * @throws JMSException
 1334        */
 1335       protected synchronized void checkClosedOrFailed() throws JMSException {
 1336           checkClosed();
 1337           if (transportFailed.get()) {
 1338               throw new ConnectionFailedException(firstFailureError);
 1339           }
 1340       }
 1341   
 1342       /**
 1343        * simply throws an exception if the Connection is already closed
 1344        * 
 1345        * @throws JMSException
 1346        */
 1347       protected synchronized void checkClosed() throws JMSException {
 1348           if (closed.get()) {
 1349               throw new ConnectionClosedException();
 1350           }
 1351       }
 1352   
 1353       /**
 1354        * Send the ConnectionInfo to the Broker
 1355        * 
 1356        * @throws JMSException
 1357        */
 1358       protected void ensureConnectionInfoSent() throws JMSException {
 1359           synchronized(this.ensureConnectionInfoSentMutex) {
 1360               // Can we skip sending the ConnectionInfo packet??
 1361               if (isConnectionInfoSentToBroker || closed.get()) {
 1362                   return;
 1363               }
 1364               //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
 1365               if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
 1366                   info.setClientId(clientIdGenerator.generateId());
 1367               }
 1368               syncSendPacket(info.copy());
 1369       
 1370               this.isConnectionInfoSentToBroker = true;
 1371               // Add a temp destination advisory consumer so that
 1372               // We know what the valid temporary destinations are on the
 1373               // broker without having to do an RPC to the broker.
 1374       
 1375               ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
 1376               if (watchTopicAdvisories) {
 1377                   advisoryConsumer = new AdvisoryConsumer(this, consumerId);
 1378               }
 1379           }
 1380       }
 1381   
 1382       public synchronized boolean isWatchTopicAdvisories() {
 1383           return watchTopicAdvisories;
 1384       }
 1385   
 1386       public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
 1387           this.watchTopicAdvisories = watchTopicAdvisories;
 1388       }
 1389   
 1390       /**
 1391        * @return Returns the useAsyncSend.
 1392        */
 1393       public boolean isUseAsyncSend() {
 1394           return useAsyncSend;
 1395       }
 1396   
 1397       /**
 1398        * Forces the use of <a
 1399        * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
 1400        * adds a massive performance boost; but means that the send() method will
 1401        * return immediately whether the message has been sent or not which could
 1402        * lead to message loss.
 1403        */
 1404       public void setUseAsyncSend(boolean useAsyncSend) {
 1405           this.useAsyncSend = useAsyncSend;
 1406       }
 1407   
 1408       /**
 1409        * @return true if always sync send messages
 1410        */
 1411       public boolean isAlwaysSyncSend() {
 1412           return this.alwaysSyncSend;
 1413       }
 1414   
 1415       /**
 1416        * Set true if always require messages to be sync sent
 1417        * 
 1418        * @param alwaysSyncSend
 1419        */
 1420       public void setAlwaysSyncSend(boolean alwaysSyncSend) {
 1421           this.alwaysSyncSend = alwaysSyncSend;
 1422       }
 1423   
 1424       /**
 1425        * Cleans up this connection so that it's state is as if the connection was
 1426        * just created. This allows the Resource Adapter to clean up a connection
 1427        * so that it can be reused without having to close and recreate the
 1428        * connection.
 1429        */
 1430       public void cleanup() throws JMSException {
 1431   
 1432           if (advisoryConsumer != null && !isTransportFailed()) {
 1433               advisoryConsumer.dispose();
 1434               advisoryConsumer = null;
 1435           }
 1436   
 1437           for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
 1438               ActiveMQSession s = i.next();
 1439               s.dispose();
 1440           }
 1441           for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
 1442               ActiveMQConnectionConsumer c = i.next();
 1443               c.dispose();
 1444           }
 1445           for (Iterator<ActiveMQInputStream> i = this.inputStreams.iterator(); i.hasNext();) {
 1446               ActiveMQInputStream c = i.next();
 1447               c.dispose();
 1448           }
 1449           for (Iterator<ActiveMQOutputStream> i = this.outputStreams.iterator(); i.hasNext();) {
 1450               ActiveMQOutputStream c = i.next();
 1451               c.dispose();
 1452           }
 1453   
 1454           if (isConnectionInfoSentToBroker) {
 1455               if (!transportFailed.get() && !closing.get()) {
 1456                   syncSendPacket(info.createRemoveCommand());
 1457               }
 1458               isConnectionInfoSentToBroker = false;
 1459           }
 1460           if (userSpecifiedClientID) {
 1461               info.setClientId(null);
 1462               userSpecifiedClientID = false;
 1463           }
 1464           clientIDSet = false;
 1465   
 1466           started.set(false);
 1467       }
 1468   
 1469       /**
 1470        * Changes the associated username/password that is associated with this
 1471        * connection. If the connection has been used, you must called cleanup()
 1472        * before calling this method.
 1473        * 
 1474        * @throws IllegalStateException if the connection is in used.
 1475        */
 1476       public void changeUserInfo(String userName, String password) throws JMSException {
 1477           if (isConnectionInfoSentToBroker) {
 1478               throw new IllegalStateException("changeUserInfo used Connection is not allowed");
 1479           }
 1480           this.info.setUserName(userName);
 1481           this.info.setPassword(password);
 1482       }
 1483   
 1484       /**
 1485        * @return Returns the resourceManagerId.
 1486        * @throws JMSException
 1487        */
 1488       public String getResourceManagerId() throws JMSException {
 1489           waitForBrokerInfo();
 1490           if (brokerInfo == null) {
 1491               throw new JMSException("Connection failed before Broker info was received.");
 1492           }
 1493           return brokerInfo.getBrokerId().getValue();
 1494       }
 1495   
 1496       /**
 1497        * Returns the broker name if one is available or null if one is not
 1498        * available yet.
 1499        */
 1500       public String getBrokerName() {
 1501           try {
 1502               brokerInfoReceived.await(5, TimeUnit.SECONDS);
 1503               if (brokerInfo == null) {
 1504                   return null;
 1505               }
 1506               return brokerInfo.getBrokerName();
 1507           } catch (InterruptedException e) {
 1508               Thread.currentThread().interrupt();
 1509               return null;
 1510           }
 1511       }
 1512   
 1513       /**
 1514        * Returns the broker information if it is available or null if it is not
 1515        * available yet.
 1516        */
 1517       public BrokerInfo getBrokerInfo() {
 1518           return brokerInfo;
 1519       }
 1520   
 1521       /**
 1522        * @return Returns the RedeliveryPolicy.
 1523        * @throws JMSException
 1524        */
 1525       public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
 1526           return redeliveryPolicy;
 1527       }
 1528   
 1529       /**
 1530        * Sets the redelivery policy to be used when messages are rolled back
 1531        */
 1532       public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
 1533           this.redeliveryPolicy = redeliveryPolicy;
 1534       }
 1535   
 1536       public BlobTransferPolicy getBlobTransferPolicy() {
 1537           if (blobTransferPolicy == null) {
 1538               blobTransferPolicy = createBlobTransferPolicy();
 1539           }
 1540           return blobTransferPolicy;
 1541       }
 1542   
 1543       /**
 1544        * Sets the policy used to describe how out-of-band BLOBs (Binary Large
 1545        * OBjects) are transferred from producers to brokers to consumers
 1546        */
 1547       public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
 1548           this.blobTransferPolicy = blobTransferPolicy;
 1549       }
 1550   
 1551       /**
 1552        * @return Returns the alwaysSessionAsync.
 1553        */
 1554       public boolean isAlwaysSessionAsync() {
 1555           return alwaysSessionAsync;
 1556       }
 1557   
 1558       /**
 1559        * If this flag is set then a separate thread is not used for dispatching
 1560        * messages for each Session in the Connection. However, a separate thread
 1561        * is always used if there is more than one session, or the session isn't in
 1562        * auto acknowledge or duplicates ok mode
 1563        */
 1564       public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
 1565           this.alwaysSessionAsync = alwaysSessionAsync;
 1566       }
 1567   
 1568       /**
 1569        * @return Returns the optimizeAcknowledge.
 1570        */
 1571       public boolean isOptimizeAcknowledge() {
 1572           return optimizeAcknowledge;
 1573       }
 1574   
 1575       /**
 1576        * Enables an optimised acknowledgement mode where messages are acknowledged
 1577        * in batches rather than individually
 1578        * 
 1579        * @param optimizeAcknowledge The optimizeAcknowledge to set.
 1580        */
 1581       public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
 1582           this.optimizeAcknowledge = optimizeAcknowledge;
 1583       }
 1584   
 1585       public long getWarnAboutUnstartedConnectionTimeout() {
 1586           return warnAboutUnstartedConnectionTimeout;
 1587       }
 1588   
 1589       /**
 1590        * Enables the timeout from a connection creation to when a warning is
 1591        * generated if the connection is not properly started via {@link #start()}
 1592        * and a message is received by a consumer. It is a very common gotcha to
 1593        * forget to <a
 1594        * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
 1595        * the connection</a> so this option makes the default case to create a
 1596        * warning if the user forgets. To disable the warning just set the value to <
 1597        * 0 (say -1).
 1598        */
 1599       public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
 1600           this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
 1601       }
 1602       
 1603       /**
 1604        * @return the sendTimeout
 1605        */
 1606       public int getSendTimeout() {
 1607           return sendTimeout;
 1608       }
 1609   
 1610       /**
 1611        * @param sendTimeout the sendTimeout to set
 1612        */
 1613       public void setSendTimeout(int sendTimeout) {
 1614           this.sendTimeout = sendTimeout;
 1615       }
 1616       
 1617       /**
 1618        * @return the sendAcksAsync
 1619        */
 1620       public boolean isSendAcksAsync() {
 1621           return sendAcksAsync;
 1622       }
 1623   
 1624       /**
 1625        * @param sendAcksAsync the sendAcksAsync to set
 1626        */
 1627       public void setSendAcksAsync(boolean sendAcksAsync) {
 1628           this.sendAcksAsync = sendAcksAsync;
 1629       }
 1630   
 1631   
 1632       /**
 1633        * Returns the time this connection was created
 1634        */
 1635       public long getTimeCreated() {
 1636           return timeCreated;
 1637       }
 1638   
 1639       private void waitForBrokerInfo() throws JMSException {
 1640           try {
 1641               brokerInfoReceived.await();
 1642           } catch (InterruptedException e) {
 1643               Thread.currentThread().interrupt();
 1644               throw JMSExceptionSupport.create(e);
 1645           }
 1646       }
 1647   
 1648       // Package protected so that it can be used in unit tests
 1649       public Transport getTransport() {
 1650           return transport;
 1651       }
 1652   
 1653       public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
 1654           producers.put(producerId, producer);
 1655       }
 1656   
 1657       public void removeProducer(ProducerId producerId) {
 1658           producers.remove(producerId);
 1659       }
 1660   
 1661       public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
 1662           dispatchers.put(consumerId, dispatcher);
 1663       }
 1664   
 1665       public void removeDispatcher(ConsumerId consumerId) {
 1666           dispatchers.remove(consumerId);
 1667       }
 1668   
 1669       /**
 1670        * @param o - the command to consume
 1671        */
 1672       public void onCommand(final Object o) {
 1673           final Command command = (Command)o;
 1674           if (!closed.get() && command != null) {
 1675               try {
 1676                   command.visit(new CommandVisitorAdapter() {
 1677                       @Override
 1678                       public Response processMessageDispatch(MessageDispatch md) throws Exception {
 1679                           waitForTransportInterruptionProcessing();
 1680                           ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
 1681                           if (dispatcher != null) {
 1682                               // Copy in case a embedded broker is dispatching via
 1683                               // vm://
 1684                               // md.getMessage() == null to signal end of queue
 1685                               // browse.
 1686                               Message msg = md.getMessage();
 1687                               if (msg != null) {
 1688                                   msg = msg.copy();
 1689                                   msg.setReadOnlyBody(true);
 1690                                   msg.setReadOnlyProperties(true);
 1691                                   msg.setRedeliveryCounter(md.getRedeliveryCounter());
 1692                                   msg.setConnection(ActiveMQConnection.this);
 1693                                   md.setMessage(msg);
 1694                               }
 1695                               dispatcher.dispatch(md);
 1696                           }
 1697                           return null;
 1698                       }
 1699   
 1700                       @Override
 1701                       public Response processProducerAck(ProducerAck pa) throws Exception {
 1702                           if (pa != null && pa.getProducerId() != null) {
 1703                               ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
 1704                               if (producer != null) {
 1705                                   producer.onProducerAck(pa);
 1706                               }
 1707                           }
 1708                           return null;
 1709                       }
 1710   
 1711                       @Override
 1712                       public Response processBrokerInfo(BrokerInfo info) throws Exception {
 1713                           brokerInfo = info;
 1714                           brokerInfoReceived.countDown();
 1715                           optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
 1716                           getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
 1717                           return null;
 1718                       }
 1719   
 1720                       @Override
 1721                       public Response processConnectionError(final ConnectionError error) throws Exception {
 1722                           asyncConnectionThread.execute(new Runnable() {
 1723                               public void run() {
 1724                                   onAsyncException(error.getException());
 1725                               }
 1726                           });
 1727                           return null;
 1728                       }
 1729   
 1730                       @Override
 1731                       public Response processControlCommand(ControlCommand command) throws Exception {
 1732                           onControlCommand(command);
 1733                           return null;
 1734                       }
 1735   
 1736                       @Override
 1737                       public Response processConnectionControl(ConnectionControl control) throws Exception {
 1738                           onConnectionControl((ConnectionControl)command);
 1739                           return null;
 1740                       }
 1741   
 1742                       @Override
 1743                       public Response processConsumerControl(ConsumerControl control) throws Exception {
 1744                           onConsumerControl((ConsumerControl)command);
 1745                           return null;
 1746                       }
 1747   
 1748                       @Override
 1749                       public Response processWireFormat(WireFormatInfo info) throws Exception {
 1750                           onWireFormatInfo((WireFormatInfo)command);
 1751                           return null;
 1752                       }
 1753                   });
 1754               } catch (Exception e) {
 1755                   onClientInternalException(e);
 1756               }
 1757   
 1758           }
 1759           for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
 1760               TransportListener listener = iter.next();
 1761               listener.onCommand(command);
 1762           }
 1763       }
 1764   
 1765       protected void onWireFormatInfo(WireFormatInfo info) {
 1766           protocolVersion.set(info.getVersion());
 1767       }
 1768   
 1769       /**
 1770        * Handles async client internal exceptions.
 1771        * A client internal exception is usually one that has been thrown
 1772        * by a container runtime component during asynchronous processing of a
 1773        * message that does not affect the connection itself.
 1774        * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
 1775        * its <code>onException</code> method, if one has been registered with this connection.
 1776        * 
 1777        * @param error the exception that the problem
 1778        */
 1779       public void onClientInternalException(final Throwable error) {
 1780           if ( !closed.get() && !closing.get() ) {
 1781               if ( this.clientInternalExceptionListener != null ) {
 1782                   asyncConnectionThread.execute(new Runnable() {
 1783                       public void run() {
 1784                           ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
 1785                       }
 1786                   });
 1787               } else {
 1788                   LOG.debug("Async client internal exception occurred with no exception listener registered: " 
 1789                           + error, error);
 1790               }
 1791           }
 1792       }
 1793       /**
 1794        * Used for handling async exceptions
 1795        * 
 1796        * @param error
 1797        */
 1798       public void onAsyncException(Throwable error) {
 1799           if (!closed.get() && !closing.get()) {
 1800               if (this.exceptionListener != null) {
 1801   
 1802                   if (!(error instanceof JMSException)) {
 1803                       error = JMSExceptionSupport.create(error);
 1804                   }
 1805                   final JMSException e = (JMSException)error;
 1806   
 1807                   asyncConnectionThread.execute(new Runnable() {
 1808                       public void run() {
 1809                           ActiveMQConnection.this.exceptionListener.onException(e);
 1810                       }
 1811                   });
 1812   
 1813               } else {
 1814                   LOG.debug("Async exception with no exception listener: " + error, error);
 1815               }
 1816           }
 1817       }
 1818   
 1819       public void onException(final IOException error) {
 1820   		onAsyncException(error);
 1821   		if (!closing.get() && !closed.get()) {
 1822   			asyncConnectionThread.execute(new Runnable() {
 1823   				public void run() {
 1824   					transportFailed(error);
 1825   					ServiceSupport.dispose(ActiveMQConnection.this.transport);
 1826   					brokerInfoReceived.countDown();
 1827   					try {
 1828   						cleanup();
 1829   					} catch (JMSException e) {
 1830   						LOG.warn("Exception during connection cleanup, " + e, e);
 1831   					}
 1832   					for (Iterator<TransportListener> iter = transportListeners
 1833   							.iterator(); iter.hasNext();) {
 1834   						TransportListener listener = iter.next();
 1835   						listener.onException(error);
 1836   					}
 1837   				}
 1838   			});
 1839   		}
 1840   	}
 1841   
 1842       public void transportInterupted() {
 1843           transportInterruptionProcessingComplete = new CountDownLatch(dispatchers.size() - (advisoryConsumer != null ? 1:0));
 1844           if (LOG.isDebugEnabled()) {
 1845               LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
 1846           }
 1847           for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
 1848               ActiveMQSession s = i.next();
 1849               s.clearMessagesInProgress();
 1850           }
 1851           for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
 1852               TransportListener listener = iter.next();
 1853               listener.transportInterupted();
 1854           }
 1855       }
 1856   
 1857       public void transportResumed() {
 1858           for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
 1859               TransportListener listener = iter.next();
 1860               listener.transportResumed();
 1861           }
 1862       }
 1863   
 1864       /**
 1865        * Create the DestinationInfo object for the temporary destination.
 1866        * 
 1867        * @param topic - if its true topic, else queue.
 1868        * @return DestinationInfo
 1869        * @throws JMSException
 1870        */
 1871       protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
 1872   
 1873           // Check if Destination info is of temporary type.
 1874           ActiveMQTempDestination dest;
 1875           if (topic) {
 1876               dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
 1877           } else {
 1878               dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
 1879           }
 1880   
 1881           DestinationInfo info = new DestinationInfo();
 1882           info.setConnectionId(this.info.getConnectionId());
 1883           info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
 1884           info.setDestination(dest);
 1885           syncSendPacket(info);
 1886   
 1887           dest.setConnection(this);
 1888           activeTempDestinations.put(dest, dest);
 1889           return dest;
 1890       }
 1891   
 1892       /**
 1893        * @param destination
 1894        * @throws JMSException
 1895        */
 1896       public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
 1897   
 1898           checkClosedOrFailed();
 1899   
 1900           for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
 1901               ActiveMQSession s = i.next();
 1902               if (s.isInUse(destination)) {
 1903                   throw new JMSException("A consumer is consuming from the temporary destination");
 1904               }
 1905           }
 1906   
 1907           activeTempDestinations.remove(destination);
 1908   
 1909           DestinationInfo info = new DestinationInfo();
 1910           info.setConnectionId(this.info.getConnectionId());
 1911           info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
 1912           info.setDestination(destination);
 1913           info.setTimeout(0);
 1914           syncSendPacket(info);
 1915       }
 1916   
 1917       public boolean isDeleted(ActiveMQDestination dest) {
 1918   
 1919           // If we are not watching the advisories.. then
 1920           // we will assume that the temp destination does exist.
 1921           if (advisoryConsumer == null) {
 1922               return false;
 1923           }
 1924   
 1925           return !activeTempDestinations.contains(dest);
 1926       }
 1927   
 1928       public boolean isCopyMessageOnSend() {
 1929           return copyMessageOnSend;
 1930       }
 1931   
 1932       public LongSequenceGenerator getLocalTransactionIdGenerator() {
 1933           return localTransactionIdGenerator;
 1934       }
 1935   
 1936       public boolean isUseCompression() {
 1937           return useCompression;
 1938       }
 1939   
 1940       /**
 1941        * Enables the use of compression of the message bodies
 1942        */
 1943       public void setUseCompression(boolean useCompression) {
 1944           this.useCompression = useCompression;
 1945       }
 1946   
 1947       public void destroyDestination(ActiveMQDestination destination) throws JMSException {
 1948   
 1949           checkClosedOrFailed();
 1950           ensureConnectionInfoSent();
 1951   
 1952           DestinationInfo info = new DestinationInfo();
 1953           info.setConnectionId(this.info.getConnectionId());
 1954           info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
 1955           info.setDestination(destination);
 1956           info.setTimeout(0);
 1957           syncSendPacket(info);
 1958   
 1959       }
 1960   
 1961       public boolean isDispatchAsync() {
 1962           return dispatchAsync;
 1963       }
 1964   
 1965       /**
 1966        * Enables or disables the default setting of whether or not consumers have
 1967        * their messages <a
 1968        * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
 1969        * synchronously or asynchronously by the broker</a>. For non-durable
 1970        * topics for example we typically dispatch synchronously by default to
 1971        * minimize context switches which boost performance. However sometimes its
 1972        * better to go slower to ensure that a single blocked consumer socket does
 1973        * not block delivery to other consumers.
 1974        * 
 1975        * @param asyncDispatch If true then consumers created on this connection
 1976        *                will default to having their messages dispatched
 1977        *                asynchronously. The default value is false.
 1978        */
 1979       public void setDispatchAsync(boolean asyncDispatch) {
 1980           this.dispatchAsync = asyncDispatch;
 1981       }
 1982   
 1983       public boolean isObjectMessageSerializationDefered() {
 1984           return objectMessageSerializationDefered;
 1985       }
 1986   
 1987       /**
 1988        * When an object is set on an ObjectMessage, the JMS spec requires the
 1989        * object to be serialized by that set method. Enabling this flag causes the
 1990        * object to not get serialized. The object may subsequently get serialized
 1991        * if the message needs to be sent over a socket or stored to disk.
 1992        */
 1993       public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
 1994           this.objectMessageSerializationDefered = objectMessageSerializationDefered;
 1995       }
 1996   
 1997       public InputStream createInputStream(Destination dest) throws JMSException {
 1998           return createInputStream(dest, null);
 1999       }
 2000   
 2001       public InputStream createInputStream(Destination dest, String messageSelector) throws JMSException {
 2002           return createInputStream(dest, messageSelector, false);
 2003       }
 2004   
 2005       public InputStream createInputStream(Destination dest, String messageSelector, boolean noLocal) throws JMSException {
 2006           return doCreateInputStream(dest, messageSelector, noLocal, null);
 2007       }
 2008   
 2009       public InputStream createDurableInputStream(Topic dest, String name) throws JMSException {
 2010           return createInputStream(dest, null, false);
 2011       }
 2012   
 2013       public InputStream createDurableInputStream(Topic dest, String name, String messageSelector) throws JMSException {
 2014           return createDurableInputStream(dest, name, messageSelector, false);
 2015       }
 2016   
 2017       public InputStream createDurableInputStream(Topic dest, String name, String messageSelector, boolean noLocal) throws JMSException {
 2018           return doCreateInputStream(dest, messageSelector, noLocal, name);
 2019       }
 2020   
 2021       private InputStream doCreateInputStream(Destination dest, String messageSelector, boolean noLocal, String subName) throws JMSException {
 2022           checkClosedOrFailed();
 2023           ensureConnectionInfoSent();
 2024           return new ActiveMQInputStream(this, createConsumerId(), ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, prefetchPolicy.getInputStreamPrefetch());
 2025       }
 2026   
 2027       /**
 2028        * Creates a persistent output stream; individual messages will be written
 2029        * to disk/database by the broker
 2030        */
 2031       public OutputStream createOutputStream(Destination dest) throws JMSException {
 2032           return createOutputStream(dest, null, ActiveMQMessage.DEFAULT_DELIVERY_MODE, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
 2033       }
 2034   
 2035       /**
 2036        * Creates a non persistent output stream; messages will not be written to
 2037        * disk
 2038        */
 2039       public OutputStream createNonPersistentOutputStream(Destination dest) throws JMSException {
 2040           return createOutputStream(dest, null, DeliveryMode.NON_PERSISTENT, ActiveMQMessage.DEFAULT_PRIORITY, ActiveMQMessage.DEFAULT_TIME_TO_LIVE);
 2041       }
 2042   
 2043       /**
 2044        * Creates an output stream allowing full control over the delivery mode,
 2045        * the priority and time to live of the messages and the properties added to
 2046        * messages on the stream.
 2047        * 
 2048        * @param streamProperties defines a map of key-value pairs where the keys
 2049        *                are strings and the values are primitive values (numbers
 2050        *                and strings) which are appended to the messages similarly
 2051        *                to using the
 2052        *                {@link javax.jms.Message#setObjectProperty(String, Object)}
 2053        *                method
 2054        */
 2055       public OutputStream createOutputStream(Destination dest, Map<String, Object> streamProperties, int deliveryMode, int priority, long timeToLive) throws JMSException {
 2056           checkClosedOrFailed();
 2057           ensureConnectionInfoSent();
 2058           return new ActiveMQOutputStream(this, createProducerId(), ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, timeToLive);
 2059       }
 2060   
 2061       /**
 2062        * Unsubscribes a durable subscription that has been created by a client.
 2063        * <P>
 2064        * This method deletes the state being maintained on behalf of the
 2065        * subscriber by its provider.
 2066        * <P>
 2067        * It is erroneous for a client to delete a durable subscription while there
 2068        * is an active <CODE>MessageConsumer </CODE> or
 2069        * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
 2070        * message is part of a pending transaction or has not been acknowledged in
 2071        * the session.
 2072        * 
 2073        * @param name the name used to identify this subscription
 2074        * @throws JMSException if the session fails to unsubscribe to the durable
 2075        *                 subscription due to some internal error.
 2076        * @throws InvalidDestinationException if an invalid subscription name is
 2077        *                 specified.
 2078        * @since 1.1
 2079        */
 2080       public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
 2081           checkClosedOrFailed();
 2082           RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
 2083           rsi.setConnectionId(getConnectionInfo().getConnectionId());
 2084           rsi.setSubscriptionName(name);
 2085           rsi.setClientId(getConnectionInfo().getClientId());
 2086           syncSendPacket(rsi);
 2087       }
 2088   
 2089       /**
 2090        * Internal send method optimized: - It does not copy the message - It can
 2091        * only handle ActiveMQ messages. - You can specify if the send is async or
 2092        * sync - Does not allow you to send /w a transaction.
 2093        */
 2094       void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
 2095           checkClosedOrFailed();
 2096   
 2097           if (destination.isTemporary() && isDeleted(destination)) {
 2098               throw new JMSException("Cannot publish to a deleted Destination: " + destination);
 2099           }
 2100   
 2101           msg.setJMSDestination(destination);
 2102           msg.setJMSDeliveryMode(deliveryMode);
 2103           long expiration = 0L;
 2104   
 2105           if (!isDisableTimeStampsByDefault()) {
 2106               long timeStamp = System.currentTimeMillis();
 2107               msg.setJMSTimestamp(timeStamp);
 2108               if (timeToLive > 0) {
 2109                   expiration = timeToLive + timeStamp;
 2110               }
 2111           }
 2112   
 2113           msg.setJMSExpiration(expiration);
 2114           msg.setJMSPriority(priority);
 2115   
 2116           msg.setJMSRedelivered(false);
 2117           msg.setMessageId(messageId);
 2118   
 2119           msg.onSend();
 2120   
 2121           msg.setProducerId(msg.getMessageId().getProducerId());
 2122   
 2123           if (LOG.isDebugEnabled()) {
 2124               LOG.debug("Sending message: " + msg);
 2125           }
 2126   
 2127           if (async) {
 2128               asyncSendPacket(msg);
 2129           } else {
 2130               syncSendPacket(msg);
 2131           }
 2132   
 2133       }
 2134   
 2135       public void addOutputStream(ActiveMQOutputStream stream) {
 2136           outputStreams.add(stream);
 2137       }
 2138   
 2139       public void removeOutputStream(ActiveMQOutputStream stream) {
 2140           outputStreams.remove(stream);
 2141       }
 2142   
 2143       public void addInputStream(ActiveMQInputStream stream) {
 2144           inputStreams.add(stream);
 2145       }
 2146   
 2147       public void removeInputStream(ActiveMQInputStream stream) {
 2148           inputStreams.remove(stream);
 2149       }
 2150   
 2151       protected void onControlCommand(ControlCommand command) {
 2152           String text = command.getCommand();
 2153           if (text != null) {
 2154               if (text.equals("shutdown")) {
 2155                   LOG.info("JVM told to shutdown");
 2156                   System.exit(0);
 2157               }
 2158           }
 2159       }
 2160   
 2161       protected void onConnectionControl(ConnectionControl command) {
 2162           if (command.isFaultTolerant()) {
 2163               this.optimizeAcknowledge = false;
 2164               for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
 2165                   ActiveMQSession s = i.next();
 2166                   s.setOptimizeAcknowledge(false);
 2167               }
 2168           }
 2169       }
 2170   
 2171       protected void onConsumerControl(ConsumerControl command) {
 2172           if (command.isClose()) {
 2173               for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
 2174                   ActiveMQSession s = i.next();
 2175                   s.close(command.getConsumerId());
 2176               }
 2177           } else {
 2178               for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
 2179                   ActiveMQSession s = i.next();
 2180                   s.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
 2181               }
 2182           }
 2183       }
 2184   
 2185       protected void transportFailed(IOException error) {
 2186           transportFailed.set(true);
 2187           if (firstFailureError == null) {
 2188               firstFailureError = error;
 2189           }
 2190       }
 2191   
 2192       /**
 2193        * Should a JMS message be copied to a new JMS Message object as part of the
 2194        * send() method in JMS. This is enabled by default to be compliant with the
 2195        * JMS specification. You can disable it if you do not mutate JMS messages
 2196        * after they are sent for a performance boost
 2197        */
 2198       public void setCopyMessageOnSend(boolean copyMessageOnSend) {
 2199           this.copyMessageOnSend = copyMessageOnSend;
 2200       }
 2201   
 2202       public String toString() {
 2203           return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
 2204       }
 2205   
 2206       protected BlobTransferPolicy createBlobTransferPolicy() {
 2207           return new BlobTransferPolicy();
 2208       }
 2209   
 2210       public int getProtocolVersion() {
 2211           return protocolVersion.get();
 2212       }
 2213   
 2214       public int getProducerWindowSize() {
 2215           return producerWindowSize;
 2216       }
 2217   
 2218       public void setProducerWindowSize(int producerWindowSize) {
 2219           this.producerWindowSize = producerWindowSize;
 2220       }
 2221   
 2222       public void setAuditDepth(int auditDepth) {
 2223           connectionAudit.setAuditDepth(auditDepth);
 2224   	}
 2225   
 2226       public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
 2227           connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
 2228   	}
 2229   
 2230       protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
 2231           connectionAudit.removeDispatcher(dispatcher);
 2232       }
 2233   
 2234       protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
 2235           return connectionAudit.isDuplicate(dispatcher, message);
 2236       }
 2237   
 2238       protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
 2239           connectionAudit.rollbackDuplicate(dispatcher, message);
 2240       }
 2241   
 2242       public IOException getFirstFailureError() {
 2243           return firstFailureError;
 2244       }
 2245   
 2246       protected void waitForTransportInterruptionProcessing() throws InterruptedException {
 2247           if (transportInterruptionProcessingComplete != null) {
 2248               while (!closed.get() && !transportFailed.get() && !transportInterruptionProcessingComplete.await(15, TimeUnit.SECONDS)) {
 2249                   LOG.warn("dispatch paused, waiting for outstanding dispatch interruption processing (" + transportInterruptionProcessingComplete.getCount() + ") to complete..");
 2250               }
 2251               synchronized (this) {
 2252                   transportInterruptionProcessingComplete = null;
 2253               }
 2254           }
 2255       }
 2256   
 2257       protected synchronized void transportInterruptionProcessingComplete() {
 2258           if (transportInterruptionProcessingComplete != null) {
 2259              transportInterruptionProcessingComplete.countDown();
 2260           }
 2261       }
 2262   
 2263       /*
 2264        * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
 2265        * will wait to receive re dispatched messages.
 2266        * default value is 0 so there is no wait by default.
 2267        */
 2268       public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
 2269           this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
 2270       }
 2271       
 2272       public long getConsumerFailoverRedeliveryWaitPeriod() {
 2273           return consumerFailoverRedeliveryWaitPeriod;
 2274       }
 2275   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » [javadoc | source]