Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » xmpp » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transport.xmpp;
   18   
   19   import java.io.IOException;
   20   import java.io.InputStream;
   21   import java.io.OutputStream;
   22   import java.net.Socket;
   23   import java.net.URI;
   24   
   25   import javax.net.SocketFactory;
   26   import javax.xml.bind.JAXBContext;
   27   import javax.xml.bind.JAXBException;
   28   import javax.xml.bind.Marshaller;
   29   import javax.xml.bind.Unmarshaller;
   30   import javax.xml.namespace.QName;
   31   import javax.xml.stream.Location;
   32   import javax.xml.stream.XMLEventReader;
   33   import javax.xml.stream.XMLInputFactory;
   34   import javax.xml.stream.XMLOutputFactory;
   35   import javax.xml.stream.XMLReporter;
   36   import javax.xml.stream.XMLStreamException;
   37   import javax.xml.stream.XMLStreamWriter;
   38   import javax.xml.stream.events.Attribute;
   39   import javax.xml.stream.events.StartElement;
   40   import javax.xml.stream.events.XMLEvent;
   41   
   42   import ietf.params.xml.ns.xmpp_sasl.Mechanisms;
   43   
   44   import org.apache.activemq.command.BrokerInfo;
   45   import org.apache.activemq.command.Command;
   46   import org.apache.activemq.transport.tcp.TcpBufferedInputStream;
   47   import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
   48   import org.apache.activemq.transport.tcp.TcpTransport;
   49   import org.apache.activemq.util.IOExceptionSupport;
   50   import org.apache.activemq.util.ServiceStopper;
   51   import org.apache.activemq.wireformat.WireFormat;
   52   import org.apache.commons.logging.Log;
   53   import org.apache.commons.logging.LogFactory;
   54   import org.jabber.etherx.streams.Features;
   55   
   56   /**
   57    * @version $Revision: 565003 $
   58    */
   59   public class XmppTransport extends TcpTransport {
   60       protected static final QName ATTRIBUTE_TO = new QName("to");
   61   
   62       private static final transient Log LOG = LogFactory.getLog(XmppTransport.class);
   63   
   64       protected OutputStream outputStream;
   65       protected InputStream inputStream;
   66   
   67       private JAXBContext context;
   68       private XMLEventReader xmlReader;
   69       private Unmarshaller unmarshaller;
   70       private Marshaller marshaller;
   71       private XMLStreamWriter xmlWriter;
   72       private String to = "client";
   73       private ProtocolConverter converter;
   74       private String from = "localhost";
   75       private String brokerId = "broker-id-1";
   76   
   77       public XmppTransport(WireFormat wireFormat, Socket socket) throws IOException {
   78           super(wireFormat, socket);
   79           init();
   80       }
   81   
   82       public XmppTransport(WireFormat wireFormat, SocketFactory socketFactory, URI uri, URI uri1) throws IOException {
   83           super(wireFormat, socketFactory, uri, uri1);
   84           init();
   85       }
   86   
   87       private void init() {
   88           converter = new ProtocolConverter(this);
   89       }
   90   
   91       @Override
   92       public void oneway(Object object) throws IOException {
   93           if (object instanceof Command) {
   94               Command command = (Command)object;
   95   
   96               if (command instanceof BrokerInfo) {
   97                   BrokerInfo brokerInfo = (BrokerInfo)command;
   98   
   99                   brokerId = brokerInfo.getBrokerId().toString();
  100                   from = brokerInfo.getBrokerName();
  101                   try {
  102                       writeOpenStream(brokerId, from);
  103                   } catch (XMLStreamException e) {
  104                       throw IOExceptionSupport.create(e);
  105                   }
  106               } else {
  107                   try {
  108                       converter.onActiveMQCommad(command);
  109                   } catch (IOException e) {
  110                       throw e;
  111                   } catch (Exception e) {
  112                       throw IOExceptionSupport.create(e);
  113                   }
  114               }
  115           } else {
  116               LOG.warn("Unkown command: " + object);
  117           }
  118       }
  119   
  120       /**
  121        * Marshalls the given POJO to the client
  122        */
  123       public void marshall(Object command) throws IOException {
  124           if (isStopped() || isStopping()) {
  125               LOG.warn("Not marshalling command as shutting down: " + command);
  126               return;
  127           }
  128           try {
  129               marshaller.marshal(command, xmlWriter);
  130               xmlWriter.flush();
  131               outputStream.flush();
  132           } catch (JAXBException e) {
  133               throw IOExceptionSupport.create(e);
  134           } catch (XMLStreamException e) {
  135               throw IOExceptionSupport.create(e);
  136           }
  137       }
  138   
  139       @Override
  140       public void doRun() throws IOException {
  141           LOG.debug("XMPP consumer thread starting");
  142           try {
  143               XMLInputFactory xif = XMLInputFactory.newInstance();
  144               xif.setXMLReporter(new XMLReporter() {
  145                   public void report(String message, String errorType, Object relatedInformation, Location location) throws XMLStreamException {
  146                       LOG.warn(message + " errorType: " + errorType + " relatedInfo: " + relatedInformation);
  147                   }
  148               });
  149   
  150               xmlReader = xif.createXMLEventReader(inputStream);
  151   
  152               XMLEvent docStart = xmlReader.nextEvent();
  153   
  154               XMLEvent rootElement = xmlReader.nextTag();
  155   
  156               if (rootElement instanceof StartElement) {
  157                   StartElement startElement = (StartElement)rootElement;
  158                   Attribute toAttribute = startElement.getAttributeByName(ATTRIBUTE_TO);
  159                   if (toAttribute != null) {
  160                       to = toAttribute.getValue();
  161                   }
  162               }
  163               while (true) {
  164                   if (isStopped()) {
  165                       break;
  166                   }
  167   
  168                   XMLEvent event = xmlReader.peek();
  169                   if (event.isStartElement()) {
  170                       // unmarshal a new object
  171                       Object object = unmarshaller.unmarshal(xmlReader);
  172                       if (object != null) {
  173                           converter.onXmppCommand(object);
  174                       }
  175                   } else {
  176                       if (event.getEventType() == XMLEvent.END_ELEMENT) {
  177                           break;
  178                       } else if (event.getEventType() == XMLEvent.END_ELEMENT || event.getEventType() == XMLEvent.END_DOCUMENT) {
  179                           break;
  180                       } else {
  181                           xmlReader.nextEvent();
  182                       }
  183   
  184                   }
  185               }
  186           } catch (Exception e) {
  187               throw IOExceptionSupport.create(e);
  188           }
  189       }
  190   
  191       public String getFrom() {
  192           return from;
  193       }
  194   
  195       @Override
  196       protected void doStop(ServiceStopper stopper) throws Exception {
  197           if (xmlWriter != null) {
  198               try {
  199                   xmlWriter.writeEndElement();
  200                   xmlWriter.writeEndDocument();
  201                   xmlWriter.close();
  202               } catch (XMLStreamException e) {
  203                   // the client may have closed first so ignore this
  204                   LOG.info("Caught trying to close transport: " + e, e);
  205               }
  206           }
  207           if (xmlReader != null) {
  208               try {
  209                   xmlReader.close();
  210               } catch (XMLStreamException e) {
  211                   // the client may have closed first so ignore this
  212                   LOG.info("Caught trying to close transport: " + e, e);
  213               }
  214           }
  215           super.doStop(stopper);
  216       }
  217   
  218       @Override
  219       protected void initializeStreams() throws Exception {
  220           // TODO it would be preferable to use class discovery here!
  221           context = JAXBContext.newInstance("jabber.client"
  222           /*
  223            * + ":jabber.server" + ":jabber.iq.gateway" + ":jabber.iq.last" +
  224            * ":jabber.iq.oob" + ":jabber.iq.pass" + ":jabber.iq.time" +
  225            * ":jabber.iq.version" + ":org.jabber.protocol.activity" +
  226            * ":org.jabber.protocol.address" + ":org.jabber.protocol.amp" +
  227            * ":org.jabber.protocol.amp_errors" + ":org.jabber.protocol.muc_admin" +
  228            * ":org.jabber.protocol.muc_unique"
  229            */
  230           + ":jabber.iq._private" + ":jabber.iq.auth" + ":jabber.iq.roster" + ":org.jabber.etherx.streams" + ":org.jabber.protocol.disco_info" + ":org.jabber.protocol.disco_items"
  231                                             + ":org.jabber.protocol.muc" + ":org.jabber.protocol.muc_user" + ":ietf.params.xml.ns.xmpp_sasl" + ":ietf.params.xml.ns.xmpp_stanzas"
  232                                             + ":ietf.params.xml.ns.xmpp_streams" + ":ietf.params.xml.ns.xmpp_tls");
  233   
  234           inputStream = new TcpBufferedInputStream(socket.getInputStream(), 8 * 1024);
  235           outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), 16 * 1024);
  236   
  237           unmarshaller = context.createUnmarshaller();
  238           marshaller = context.createMarshaller();
  239           marshaller.setProperty(Marshaller.JAXB_FRAGMENT, true);
  240       }
  241   
  242       protected void writeOpenStream(String id, String from) throws IOException, XMLStreamException {
  243           LOG.debug("Sending initial stream element");
  244           XMLOutputFactory factory = XMLOutputFactory.newInstance();
  245           // factory.setProperty(XMLOutputFactory.IS_REPAIRING_NAMESPACES, true);
  246           xmlWriter = factory.createXMLStreamWriter(outputStream);
  247   
  248           // write the dummy start tag
  249           xmlWriter.writeStartDocument();
  250           xmlWriter.writeStartElement("stream", "stream", "http://etherx.jabber.org/streams");
  251           xmlWriter.writeDefaultNamespace("jabber:client");
  252           xmlWriter.writeNamespace("stream", "http://etherx.jabber.org/streams");
  253           xmlWriter.writeAttribute("version", "1.0");
  254           xmlWriter.writeAttribute("id", id);
  255           if (to == null) {
  256               to = "client";
  257           }
  258           xmlWriter.writeAttribute("to", to);
  259           xmlWriter.writeAttribute("from", from);
  260   
  261           // now lets write the features
  262           Features features = new Features();
  263   
  264           // TODO support TLS
  265           // features.getAny().add(new Starttls());
  266   
  267           Mechanisms mechanisms = new Mechanisms();
  268   
  269           // TODO support SASL
  270           // mechanisms.getMechanism().add("DIGEST-MD5");
  271           // mechanisms.getMechanism().add("PLAIN");
  272           features.getAny().add(mechanisms);
  273           marshall(features);
  274   
  275           LOG.debug("Initial stream element sent!");
  276       }
  277   
  278   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » xmpp » [javadoc | source]