Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » xmpp » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.transport.xmpp;
   18   
   19   import java.io.IOException;
   20   import java.io.PrintWriter;
   21   import java.io.StringWriter;
   22   import java.util.HashMap;
   23   import java.util.List;
   24   import java.util.Map;
   25   import java.util.concurrent.ConcurrentHashMap;
   26   import java.util.concurrent.atomic.AtomicBoolean;
   27   
   28   import javax.jms.JMSException;
   29   import org.w3c.dom.Element;
   30   
   31   import ietf.params.xml.ns.xmpp_sasl.Auth;
   32   import ietf.params.xml.ns.xmpp_sasl.Challenge;
   33   import ietf.params.xml.ns.xmpp_sasl.Success;
   34   import ietf.params.xml.ns.xmpp_tls.Proceed;
   35   import ietf.params.xml.ns.xmpp_tls.Starttls;
   36   import jabber.client.Body;
   37   import jabber.client.Error;
   38   import jabber.client.Iq;
   39   import jabber.client.Message;
   40   import jabber.client.Presence;
   41   import jabber.iq.auth.Query;
   42   
   43   import org.apache.activemq.advisory.AdvisorySupport;
   44   import org.apache.activemq.command.ActiveMQDestination;
   45   import org.apache.activemq.command.ActiveMQMessage;
   46   import org.apache.activemq.command.ActiveMQTempQueue;
   47   import org.apache.activemq.command.ActiveMQTextMessage;
   48   import org.apache.activemq.command.ActiveMQTopic;
   49   import org.apache.activemq.command.Command;
   50   import org.apache.activemq.command.ConnectionId;
   51   import org.apache.activemq.command.ConnectionInfo;
   52   import org.apache.activemq.command.ConsumerId;
   53   import org.apache.activemq.command.ConsumerInfo;
   54   import org.apache.activemq.command.DestinationInfo;
   55   import org.apache.activemq.command.ExceptionResponse;
   56   import org.apache.activemq.command.MessageAck;
   57   import org.apache.activemq.command.MessageDispatch;
   58   import org.apache.activemq.command.MessageId;
   59   import org.apache.activemq.command.ProducerId;
   60   import org.apache.activemq.command.ProducerInfo;
   61   import org.apache.activemq.command.Response;
   62   import org.apache.activemq.command.SessionId;
   63   import org.apache.activemq.command.SessionInfo;
   64   import org.apache.activemq.transport.xmpp.command.Handler;
   65   import org.apache.activemq.transport.xmpp.command.HandlerRegistry;
   66   import org.apache.activemq.util.IdGenerator;
   67   import org.apache.activemq.util.IntSequenceGenerator;
   68   import org.apache.activemq.util.LongSequenceGenerator;
   69   import org.apache.commons.logging.Log;
   70   import org.apache.commons.logging.LogFactory;
   71   import org.jabber.protocol.disco_info.Feature;
   72   import org.jabber.protocol.disco_info.Identity;
   73   import org.jabber.protocol.disco_items.Item;
   74   import org.jabber.protocol.muc_user.X;
   75   
   76   /**
   77    * TODO lots of this code could be shared with Stomp
   78    */
   79   public class ProtocolConverter {
   80       private static final transient Log LOG = LogFactory.getLog(ProtocolConverter.class);
   81       private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
   82       private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator("xmpp");
   83   
   84       private HandlerRegistry registry = new HandlerRegistry();
   85       private XmppTransport transport;
   86   
   87       private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
   88       private final SessionId sessionId = new SessionId(connectionId, -1);
   89       private final ProducerId producerId = new ProducerId(sessionId, 1);
   90   
   91       private final ConnectionInfo connectionInfo = new ConnectionInfo(connectionId);
   92       private final SessionInfo sessionInfo = new SessionInfo(sessionId);
   93       private final ProducerInfo producerInfo = new ProducerInfo(producerId);
   94   
   95       private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
   96       private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
   97       private final IntSequenceGenerator tempDestinationIdGenerator = new IntSequenceGenerator();
   98   
   99       private final Map<Integer, Handler<Response>> resposeHandlers = new ConcurrentHashMap<Integer, Handler<Response>>();
  100       private final Map<ConsumerId, Handler<MessageDispatch>> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, Handler<MessageDispatch>>();
  101       private final Map<String, ConsumerInfo> jidToConsumerMap = new HashMap<String, ConsumerInfo>();
  102       private final Map<String, ConsumerInfo> jidToInboxConsumerMap = new HashMap<String, ConsumerInfo>();
  103   
  104       private final Object commnadIdMutex = new Object();
  105       private int lastCommandId;
  106       private final AtomicBoolean connected = new AtomicBoolean(false);
  107       private ActiveMQTempQueue inboxDestination;
  108   
  109       public ProtocolConverter(XmppTransport transport) {
  110           this.transport = transport;
  111           initialiseRegistry();
  112       }
  113   
  114       protected int generateCommandId() {
  115           synchronized (commnadIdMutex) {
  116               return lastCommandId++;
  117           }
  118       }
  119   
  120       protected void initialiseRegistry() {
  121           // this kinda wiring muck is soooo much cleaner in C# :(
  122           registry.registerHandler(Message.class, new Handler<Message>() {
  123               public void handle(Message event) throws Exception {
  124                   onMessage(event);
  125               }
  126           });
  127           registry.registerHandler(Auth.class, new Handler<Auth>() {
  128               public void handle(Auth event) throws Exception {
  129                   onAuth(event);
  130               }
  131           });
  132           registry.registerHandler(Starttls.class, new Handler<Starttls>() {
  133               public void handle(Starttls event) throws Exception {
  134                   onStarttls(event);
  135               }
  136           });
  137           registry.registerHandler(Iq.class, new Handler<Iq>() {
  138               public void handle(Iq event) throws Exception {
  139                   onIq(event);
  140               }
  141           });
  142           registry.registerHandler(Presence.class, new Handler<Presence>() {
  143               public void handle(Presence event) throws Exception {
  144                   onPresence(event);
  145               }
  146           });
  147       }
  148   
  149       public void onXmppCommand(Object command) throws Exception {
  150           // TODO we could do some nice code generation to boost performance
  151           // by autogenerating the bytecode to statically lookup a handler from a
  152           // registry maybe?
  153   
  154           Handler handler = registry.getHandler(command.getClass());
  155           if (handler == null) {
  156               unknownCommand(command);
  157           } else {
  158               handler.handle(command);
  159           }
  160       }
  161   
  162       public void onActiveMQCommad(Command command) throws Exception {
  163           if (command.isResponse()) {
  164               Response response = (Response)command;
  165               Handler<Response> handler = resposeHandlers.remove(new Integer(response.getCorrelationId()));
  166               if (handler != null) {
  167                   handler.handle(response);
  168               } else {
  169                   LOG.warn("No handler for response: " + response);
  170               }
  171           } else if (command.isMessageDispatch()) {
  172               MessageDispatch md = (MessageDispatch)command;
  173               Handler<MessageDispatch> handler = subscriptionsByConsumerId.get(md.getConsumerId());
  174               if (handler != null) {
  175                   handler.handle(md);
  176               } else {
  177                   LOG.warn("No handler for message: " + md);
  178               }
  179           }
  180       }
  181   
  182       protected void unknownCommand(Object command) throws Exception {
  183           LOG.warn("Unkown command: " + command + " of type: " + command.getClass().getName());
  184       }
  185   
  186       protected void onIq(final Iq iq) throws Exception {
  187           Object any = iq.getAny();
  188   
  189           if (any instanceof Query) {
  190               onAuthQuery(any, iq);
  191   
  192           } else if (any instanceof jabber.iq._private.Query) {
  193               jabber.iq._private.Query query = (jabber.iq._private.Query)any;
  194   
  195               if (LOG.isDebugEnabled()) {
  196                   LOG.debug("Iq Private " + debugString(iq) + " any: " + query.getAny());
  197               }
  198   
  199               Iq result = createResult(iq);
  200               jabber.iq._private.Query answer = new jabber.iq._private.Query();
  201               result.setAny(answer);
  202               transport.marshall(result);
  203           } else if (any instanceof jabber.iq.roster.Query) {
  204               jabber.iq.roster.Query query = (jabber.iq.roster.Query)any;
  205   
  206               if (LOG.isDebugEnabled()) {
  207                   LOG.debug("Iq Roster " + debugString(iq) + " item: " + query.getItem());
  208               }
  209   
  210               Iq result = createResult(iq);
  211               jabber.iq.roster.Query roster = new jabber.iq.roster.Query();
  212               result.setAny(roster);
  213               transport.marshall(result);
  214           } else if (any instanceof org.jabber.protocol.disco_items.Query) {
  215               onDiscoItems(iq, (org.jabber.protocol.disco_items.Query)any);
  216           } else if (any instanceof org.jabber.protocol.disco_info.Query) {
  217               onDiscoInfo(iq, (org.jabber.protocol.disco_info.Query)any);
  218           } else {
  219               if (any instanceof Element) {
  220                   Element element = (Element)any;
  221                   LOG.warn("Iq Unknown " + debugString(iq) + " element namespace: " + element.getNamespaceURI() + " localName: " + element.getLocalName());
  222               } else {
  223                   LOG.warn("Iq Unknown " + debugString(iq) + " any: " + any + " of type: " + any.getClass().getName());
  224               }
  225               Iq result = createResult(iq);
  226               jabber.client.Error error = new Error();
  227               error.setUnexpectedRequest("Don't understand: " + any.toString());
  228               result.setAny(error);
  229               transport.marshall(result);
  230           }
  231       }
  232   
  233       protected void onAuthQuery(Object any, final Iq iq) throws IOException {
  234           Query query = (Query)any;
  235           if (LOG.isDebugEnabled()) {
  236               LOG.debug("Iq Auth Query " + debugString(iq) + " resource: " + query.getResource() + " username: " + query.getUsername());
  237           }
  238           if (query.getPassword() == null) {
  239               Iq result = createResult(iq);
  240               Query required = new Query();
  241               required.setPassword("");
  242               required.setUsername("");
  243               result.setAny(required);
  244               transport.marshall(result);
  245               return;
  246           }
  247   
  248           // connectionInfo.setClientId(query.getResource());
  249           connectionInfo.setUserName(query.getUsername());
  250           connectionInfo.setPassword(query.getPassword());
  251   
  252           // TODO support digest?
  253   
  254           if (connectionInfo.getClientId() == null) {
  255               connectionInfo.setClientId(CLIENT_ID_GENERATOR.generateId());
  256           }
  257   
  258           sendToActiveMQ(connectionInfo, new Handler<Response>() {
  259               public void handle(Response response) throws Exception {
  260   
  261                   Iq result = createResult(iq);
  262   
  263                   if (response instanceof ExceptionResponse) {
  264                       ExceptionResponse exceptionResponse = (ExceptionResponse)response;
  265                       Throwable exception = exceptionResponse.getException();
  266   
  267                       LOG.warn("Failed to create connection: " + exception, exception);
  268   
  269                       Error error = new Error();
  270                       result.setError(error);
  271   
  272                       StringWriter buffer = new StringWriter();
  273                       exception.printStackTrace(new PrintWriter(buffer));
  274                       error.setInternalServerError(buffer.toString());
  275                   } else {
  276                       connected.set(true);
  277                   }
  278                   transport.marshall(result);
  279   
  280                   sendToActiveMQ(sessionInfo, createErrorHandler("create sesssion"));
  281                   sendToActiveMQ(producerInfo, createErrorHandler("create producer"));
  282               }
  283           });
  284       }
  285   
  286       protected String debugString(Iq iq) {
  287           return " to: " + iq.getTo() + " type: " + iq.getType() + " from: " + iq.getFrom() + " id: " + iq.getId();
  288       }
  289   
  290       protected void onDiscoItems(Iq iq, org.jabber.protocol.disco_items.Query query) throws IOException {
  291           String to = iq.getTo();
  292   
  293           if (LOG.isDebugEnabled()) {
  294               LOG.debug("Iq Disco Items query " + debugString(iq) + " node: " + query.getNode() + " item: " + query.getItem());
  295           }
  296   
  297           Iq result = createResult(iq);
  298           org.jabber.protocol.disco_items.Query answer = new org.jabber.protocol.disco_items.Query();
  299           if (to == null || to.length() == 0) {
  300               answer.getItem().add(createItem("queues", "Queues", "queues"));
  301               answer.getItem().add(createItem("topics", "Topics", "topics"));
  302           } else {
  303               // lets not add anything?
  304           }
  305   
  306           result.setAny(answer);
  307           transport.marshall(result);
  308       }
  309   
  310       protected void onDiscoInfo(Iq iq, org.jabber.protocol.disco_info.Query query) throws IOException {
  311           String to = iq.getTo();
  312   
  313           // TODO lets create the topic 'to'
  314   
  315           if (LOG.isDebugEnabled()) {
  316               LOG.debug("Iq Disco Info query " + debugString(iq) + " node: " + query.getNode() + " features: " + query.getFeature() + " identity: " + query.getIdentity());
  317           }
  318   
  319           Iq result = createResult(iq);
  320           org.jabber.protocol.disco_info.Query answer = new org.jabber.protocol.disco_info.Query();
  321           answer.setNode(to);
  322           answer.getFeature().add(createFeature("http://jabber.org/protocol/disco#info"));
  323           answer.getFeature().add(createFeature("http://jabber.org/protocol/disco#items"));
  324           if (to == null || to.length() == 0) {
  325               answer.getIdentity().add(createIdentity("directory", "chatroom", "queues"));
  326               answer.getIdentity().add(createIdentity("directory", "chatroom", "topics"));
  327               /*
  328                * answer.getIdentity().add(createIdentity("hierarchy", "queues",
  329                * "branch")); answer.getIdentity().add(createIdentity("hierarchy",
  330                * "topics", "branch"));
  331                */
  332           } else {
  333               // for queues/topics
  334               if (to.equals("queues")) {
  335                   answer.getIdentity().add(createIdentity("conference", "queue.a", "text"));
  336                   answer.getIdentity().add(createIdentity("conference", "queue.b", "text"));
  337               } else if (to.equals("topics")) {
  338                   answer.getIdentity().add(createIdentity("conference", "topic.x", "text"));
  339                   answer.getIdentity().add(createIdentity("conference", "topic.y", "text"));
  340                   answer.getIdentity().add(createIdentity("conference", "topic.z", "text"));
  341               } else {
  342                   // lets reply to an actual room
  343                   answer.getIdentity().add(createIdentity("conference", to, "text"));
  344                   answer.getFeature().add(createFeature("http://jabber.org/protocol/muc"));
  345                   answer.getFeature().add(createFeature("muc-open"));
  346               }
  347           }
  348   
  349           result.setAny(answer);
  350           transport.marshall(result);
  351       }
  352   
  353       protected void onPresence(Presence presence) throws IOException, JMSException {
  354           if (LOG.isDebugEnabled()) {
  355               LOG.debug("Presence: " + presence.getFrom() + " id: " + presence.getId() + " to: " + presence.getTo() + " type: " + presence.getType() + " showOrStatusOrPriority: "
  356                         + presence.getShowOrStatusOrPriority() + " any: " + presence.getAny());
  357           }
  358           org.jabber.protocol.muc_user.Item item = new org.jabber.protocol.muc_user.Item();
  359           item.setAffiliation("owner");
  360           item.setRole("moderator");
  361           item.setNick("broker");
  362           sendPresence(presence, item);
  363   
  364           /*
  365            * item = new org.jabber.protocol.muc_user.Item();
  366            * item.setAffiliation("admin"); item.setRole("moderator");
  367            * sendPresence(presence, item);
  368            */
  369   
  370           // lets create a subscription
  371           final String to = presence.getTo();
  372   
  373           ActiveMQDestination destination = createActiveMQDestination(to);
  374           if (destination == null) {
  375               LOG.debug("No 'to' attribute specified for presence so not creating a JMS subscription");
  376               return;
  377           }
  378           subscribe(to, destination, jidToConsumerMap);
  379   
  380           // lets subscribe to a personal inbox for replies
  381   
  382           // Check if Destination info is of temporary type.
  383           if (inboxDestination == null) {
  384               inboxDestination = new ActiveMQTempQueue(connectionInfo.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
  385   
  386               DestinationInfo info = new DestinationInfo();
  387               info.setConnectionId(connectionInfo.getConnectionId());
  388               info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
  389               info.setDestination(inboxDestination);
  390               sendToActiveMQ(info, null);
  391   
  392               subscribe(to, inboxDestination, jidToInboxConsumerMap);
  393           }
  394       }
  395   
  396       protected void subscribe(final String to, ActiveMQDestination destination, Map<String, ConsumerInfo> consumerMap) {
  397           boolean createConsumer = false;
  398           ConsumerInfo consumerInfo = null;
  399           synchronized (consumerMap) {
  400               consumerInfo = consumerMap.get(to);
  401               if (consumerInfo == null) {
  402                   consumerInfo = new ConsumerInfo();
  403                   consumerMap.put(to, consumerInfo);
  404   
  405                   ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
  406                   consumerInfo.setConsumerId(consumerId);
  407                   consumerInfo.setPrefetchSize(10);
  408                   consumerInfo.setNoLocal(true);
  409                   createConsumer = true;
  410               }
  411           }
  412           if (!createConsumer) {
  413               return;
  414           }
  415   
  416           consumerInfo.setDestination(destination);
  417   
  418           subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), new Handler<MessageDispatch>() {
  419               public void handle(MessageDispatch messageDispatch) throws Exception {
  420                   // processing the inbound message
  421                   if (LOG.isDebugEnabled()) {
  422                       LOG.debug("Receiving inbound: " + messageDispatch.getMessage());
  423                   }
  424   
  425                   // lets send back an ACK
  426                   MessageAck ack = new MessageAck(messageDispatch, MessageAck.STANDARD_ACK_TYPE, 1);
  427                   sendToActiveMQ(ack, createErrorHandler("Ack of message: " + messageDispatch.getMessage().getMessageId()));
  428   
  429                   Message message = createXmppMessage(to, messageDispatch);
  430                   if (message != null) {
  431                       if (LOG.isDebugEnabled()) {
  432                           LOG.debug("Sending message to XMPP client from: " + message.getFrom() + " to: " + message.getTo() + " type: " + message.getType() + " with body: " + message.getAny());
  433                       }
  434                       transport.marshall(message);
  435                   }
  436               }
  437           });
  438           sendToActiveMQ(consumerInfo, createErrorHandler("subscribe to destination: " + destination));
  439       }
  440   
  441       protected Message createXmppMessage(String to, MessageDispatch messageDispatch) throws JMSException {
  442           Message answer = new Message();
  443           answer.setType("groupchat");
  444           String from = to;
  445           int idx = from.indexOf('/');
  446           if (idx > 0) {
  447               from = from.substring(0, idx) + "/broker";
  448           }
  449           answer.setFrom(from);
  450           answer.setTo(to);
  451   
  452           org.apache.activemq.command.Message message = messageDispatch.getMessage();
  453           // answer.setType(message.getType());
  454           if (message instanceof ActiveMQTextMessage) {
  455               ActiveMQTextMessage activeMQTextMessage = (ActiveMQTextMessage)message;
  456               Body body = new Body();
  457               String text = activeMQTextMessage.getText();
  458               LOG.info("Setting the body text to be: " + text);
  459               body.setValue(text);
  460               answer.getAny().add(body);
  461           } else {
  462               // TODO support other message types
  463               LOG.warn("Could not convert the message to a complete Jabber message: " + message);
  464           }
  465           return answer;
  466       }
  467   
  468       protected void sendPresence(Presence presence, org.jabber.protocol.muc_user.Item item) throws IOException {
  469           Presence answer = new Presence();
  470           answer.setFrom(presence.getTo());
  471           answer.setType(presence.getType());
  472           answer.setTo(presence.getFrom());
  473           X x = new X();
  474           x.getDeclineOrDestroyOrInvite().add(item);
  475           answer.getShowOrStatusOrPriority().add(x);
  476           transport.marshall(answer);
  477       }
  478   
  479       protected Item createItem(String jid, String name, String node) {
  480           Item answer = new Item();
  481           answer.setJid(jid);
  482           answer.setName(name);
  483           answer.setNode(node);
  484           return answer;
  485       }
  486   
  487       protected Identity createIdentity(String category, String type, String name) {
  488           Identity answer = new Identity();
  489           answer.setCategory(category);
  490           answer.setName(name);
  491           answer.setType(type);
  492           return answer;
  493       }
  494   
  495       protected Feature createFeature(String var) {
  496           Feature feature = new Feature();
  497           feature.setVar(var);
  498           return feature;
  499       }
  500   
  501       /**
  502        * Creates a result command from the input
  503        */
  504       protected Iq createResult(Iq iq) {
  505           Iq result = new Iq();
  506           result.setId(iq.getId());
  507           result.setFrom(transport.getFrom());
  508           result.setTo(iq.getFrom());
  509           result.setLang(iq.getLang());
  510           result.setType("result");
  511           return result;
  512       }
  513   
  514       protected void sendToActiveMQ(Command command, Handler<Response> handler) {
  515           command.setCommandId(generateCommandId());
  516           if (handler != null) {
  517               command.setResponseRequired(true);
  518               resposeHandlers.put(command.getCommandId(), handler);
  519           }
  520           transport.getTransportListener().onCommand(command);
  521       }
  522   
  523       protected void onStarttls(Starttls starttls) throws Exception {
  524           LOG.debug("Starttls");
  525           transport.marshall(new Proceed());
  526       }
  527   
  528       protected void onMessage(Message message) throws Exception {
  529           if (LOG.isDebugEnabled()) {
  530               LOG.debug("Message from: " + message.getFrom() + " to: " + message.getTo() + " subjectOrBodyOrThread: " + message.getSubjectOrBodyOrThread());
  531           }
  532   
  533           final ActiveMQMessage activeMQMessage = createActiveMQMessage(message);
  534   
  535           ActiveMQDestination destination = createActiveMQDestination(message.getTo());
  536   
  537           activeMQMessage.setMessageId(new MessageId(producerInfo, messageIdGenerator.getNextSequenceId()));
  538           activeMQMessage.setDestination(destination);
  539           activeMQMessage.setProducerId(producerId);
  540           activeMQMessage.setTimestamp(System.currentTimeMillis());
  541           addActiveMQMessageHeaders(activeMQMessage, message);
  542   
  543           /*
  544            * MessageDispatch dispatch = new MessageDispatch();
  545            * dispatch.setDestination(destination);
  546            * dispatch.setMessage(activeMQMessage);
  547            */
  548   
  549           if (LOG.isDebugEnabled()) {
  550               LOG.debug("Sending ActiveMQ message: " + activeMQMessage);
  551           }
  552           sendToActiveMQ(activeMQMessage, createErrorHandler("send message"));
  553       }
  554   
  555       protected Handler<Response> createErrorHandler(final String text) {
  556           return new Handler<Response>() {
  557               public void handle(Response event) throws Exception {
  558                   if (event instanceof ExceptionResponse) {
  559                       ExceptionResponse exceptionResponse = (ExceptionResponse)event;
  560                       Throwable exception = exceptionResponse.getException();
  561                       LOG.error("Failed to " + text + ". Reason: " + exception, exception);
  562                   } else if (LOG.isDebugEnabled()) {
  563                       LOG.debug("Completed " + text);
  564                   }
  565               }
  566           };
  567       }
  568   
  569       /**
  570        * Converts the Jabber destination name into a destination in ActiveMQ
  571        */
  572       protected ActiveMQDestination createActiveMQDestination(String jabberDestination) throws JMSException {
  573           if (jabberDestination == null) {
  574               return null;
  575           }
  576           String name = jabberDestination;
  577           int idx = jabberDestination.indexOf('@');
  578           if (idx > 0) {
  579               name = name.substring(0, idx);
  580           }
  581   
  582           System.out.println("#### Creating ActiveMQ destination for: " + name);
  583   
  584           // lets support lower-case versions of the agent topic
  585           if (name.equalsIgnoreCase(AdvisorySupport.AGENT_TOPIC)) {
  586               name = AdvisorySupport.AGENT_TOPIC;
  587           }
  588           return new ActiveMQTopic(name);
  589       }
  590   
  591       protected ActiveMQMessage createActiveMQMessage(Message message) throws JMSException {
  592           ActiveMQTextMessage answer = new ActiveMQTextMessage();
  593           String text = "";
  594           List<Object> list = message.getSubjectOrBodyOrThread();
  595           for (Object object : list) {
  596               if (object instanceof Body) {
  597                   Body body = (Body)object;
  598                   text = body.getValue();
  599                   break;
  600               }
  601           }
  602           answer.setText(text);
  603           return answer;
  604       }
  605   
  606       protected void addActiveMQMessageHeaders(ActiveMQMessage answer, Message message) throws JMSException {
  607           answer.setStringProperty("XMPPFrom", message.getFrom());
  608           answer.setStringProperty("XMPPID", message.getId());
  609           answer.setStringProperty("XMPPLang", message.getLang());
  610           answer.setStringProperty("XMPPTo", message.getTo());
  611           answer.setJMSType(message.getType());
  612           ActiveMQDestination replyTo = createActiveMQDestination(message.getFrom());
  613           if (replyTo == null) {
  614               replyTo = inboxDestination;
  615           }
  616           System.out.println("Setting reply to destination to: " + replyTo);
  617           answer.setJMSReplyTo(replyTo);
  618       }
  619   
  620       protected void onAuth(Auth auth) throws Exception {
  621           if (LOG.isDebugEnabled()) {
  622               LOG.debug("Auth mechanism: " + auth.getMechanism() + " value: " + auth.getValue());
  623           }
  624           String value = createChallengeValue(auth);
  625           if (value != null) {
  626               Challenge challenge = new Challenge();
  627               challenge.setValue(value);
  628               transport.marshall(challenge);
  629           } else {
  630               transport.marshall(new Success());
  631           }
  632       }
  633   
  634       protected String createChallengeValue(Auth auth) {
  635           // TODO implement the challenge
  636           return null;
  637       }
  638   
  639   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » transport » xmpp » [javadoc | source]