Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » jpa » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.store.jpa;
   18   
   19   import java.io.IOException;
   20   import java.util.ArrayList;
   21   import java.util.List;
   22   import java.util.Map;
   23   import java.util.concurrent.ConcurrentHashMap;
   24   import java.util.concurrent.atomic.AtomicLong;
   25   
   26   import javax.persistence.EntityManager;
   27   import javax.persistence.Query;
   28   
   29   import org.apache.activemq.broker.ConnectionContext;
   30   import org.apache.activemq.command.ActiveMQDestination;
   31   import org.apache.activemq.command.Message;
   32   import org.apache.activemq.command.MessageId;
   33   import org.apache.activemq.command.SubscriptionInfo;
   34   import org.apache.activemq.store.MessageRecoveryListener;
   35   import org.apache.activemq.store.TopicMessageStore;
   36   import org.apache.activemq.store.jpa.model.StoredMessage;
   37   import org.apache.activemq.store.jpa.model.StoredSubscription;
   38   import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId;
   39   import org.apache.activemq.util.ByteSequence;
   40   import org.apache.activemq.util.IOExceptionSupport;
   41   
   42   public class JPATopicMessageStore extends JPAMessageStore implements TopicMessageStore {
   43       private Map<SubscriptionId, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<SubscriptionId, AtomicLong>();
   44   
   45       public JPATopicMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
   46           super(adapter, destination);
   47       }
   48   
   49       public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
   50           EntityManager manager = adapter.beginEntityManager(context);
   51           try {
   52               StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
   53               ss.setLastAckedId(messageId.getBrokerSequenceId());
   54           } catch (Throwable e) {
   55               adapter.rollbackEntityManager(context, manager);
   56               throw IOExceptionSupport.create(e);
   57           }
   58           adapter.commitEntityManager(context, manager);
   59       }
   60   
   61       public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
   62           EntityManager manager = adapter.beginEntityManager(null);
   63           try {
   64               StoredSubscription ss = new StoredSubscription();
   65               ss.setClientId(info.getClientId());
   66               ss.setSubscriptionName(info.getSubscriptionName());
   67               ss.setDestination(destinationName);
   68               ss.setSelector(info.getSelector());
   69               ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName());
   70               ss.setLastAckedId(-1);
   71   
   72               if (!retroactive) {
   73                   Query query = manager.createQuery("select max(m.id) from StoredMessage m");
   74                   Long rc = (Long)query.getSingleResult();
   75                   if (rc != null) {
   76                       ss.setLastAckedId(rc);
   77                   }
   78               }
   79   
   80               manager.persist(ss);
   81           } catch (Throwable e) {
   82               adapter.rollbackEntityManager(null, manager);
   83               throw IOExceptionSupport.create(e);
   84           }
   85           adapter.commitEntityManager(null, manager);
   86       }
   87   
   88       public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
   89           EntityManager manager = adapter.beginEntityManager(null);
   90           try {
   91               StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
   92               manager.remove(ss);
   93           } catch (Throwable e) {
   94               adapter.rollbackEntityManager(null, manager);
   95               throw IOExceptionSupport.create(e);
   96           }
   97           adapter.commitEntityManager(null, manager);
   98       }
   99   
  100       private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) {
  101           Query query = manager.createQuery("select ss from StoredSubscription ss " + "where ss.clientId=?1 " + "and ss.subscriptionName=?2 " + "and ss.destination=?3");
  102           query.setParameter(1, clientId);
  103           query.setParameter(2, subscriptionName);
  104           query.setParameter(3, destinationName);
  105           List<StoredSubscription> resultList = query.getResultList();
  106           if (resultList.isEmpty()) {
  107               return null;
  108           }
  109           return resultList.get(0);
  110       }
  111   
  112       public SubscriptionInfo[] getAllSubscriptions() throws IOException {
  113           SubscriptionInfo rc[];
  114           EntityManager manager = adapter.beginEntityManager(null);
  115           try {
  116               ArrayList<SubscriptionInfo> l = new ArrayList<SubscriptionInfo>();
  117   
  118               Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
  119               query.setParameter(1, destinationName);
  120               for (StoredSubscription ss : (List<StoredSubscription>)query.getResultList()) {
  121                   SubscriptionInfo info = new SubscriptionInfo();
  122                   info.setClientId(ss.getClientId());
  123                   info.setDestination(destination);
  124                   info.setSelector(ss.getSelector());
  125                   info.setSubscriptionName(ss.getSubscriptionName());
  126                   info.setSubscribedDestination(toSubscribedDestination(ss));
  127                   l.add(info);
  128               }
  129   
  130               rc = new SubscriptionInfo[l.size()];
  131               l.toArray(rc);
  132           } catch (Throwable e) {
  133               adapter.rollbackEntityManager(null, manager);
  134               throw IOExceptionSupport.create(e);
  135           }
  136           adapter.commitEntityManager(null, manager);
  137           return rc;
  138       }
  139   
  140       public int getMessageCount(String clientId, String subscriptionName) throws IOException {
  141           Long rc;
  142           EntityManager manager = adapter.beginEntityManager(null);
  143           try {
  144               Query query = manager.createQuery("select count(m) FROM StoredMessage m, StoredSubscription ss " + "where ss.clientId=?1 " + "and   ss.subscriptionName=?2 " + "and   ss.destination=?3 "
  145                                                 + "and   m.destination=ss.destination and m.id > ss.lastAckedId");
  146               query.setParameter(1, clientId);
  147               query.setParameter(2, subscriptionName);
  148               query.setParameter(3, destinationName);
  149               rc = (Long)query.getSingleResult();
  150           } catch (Throwable e) {
  151               adapter.rollbackEntityManager(null, manager);
  152               throw IOExceptionSupport.create(e);
  153           }
  154           adapter.commitEntityManager(null, manager);
  155           return rc.intValue();
  156       }
  157   
  158       public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
  159           SubscriptionInfo rc = null;
  160           EntityManager manager = adapter.beginEntityManager(null);
  161           try {
  162               StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
  163               if (ss != null) {
  164                   rc = new SubscriptionInfo();
  165                   rc.setClientId(ss.getClientId());
  166                   rc.setDestination(destination);
  167                   rc.setSelector(ss.getSelector());
  168                   rc.setSubscriptionName(ss.getSubscriptionName());
  169                   rc.setSubscribedDestination(toSubscribedDestination(ss));
  170               }
  171           } catch (Throwable e) {
  172               adapter.rollbackEntityManager(null, manager);
  173               throw IOExceptionSupport.create(e);
  174           }
  175           adapter.commitEntityManager(null, manager);
  176           return rc;
  177       }
  178   
  179       private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) {
  180           if (ss.getSubscribedDestination() == null) {
  181               return null;
  182           }
  183           return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE);
  184       }
  185   
  186       public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
  187           EntityManager manager = adapter.beginEntityManager(null);
  188           try {
  189               SubscriptionId id = new SubscriptionId();
  190               id.setClientId(clientId);
  191               id.setSubscriptionName(subscriptionName);
  192               id.setDestination(destinationName);
  193   
  194               AtomicLong last = subscriberLastMessageMap.get(id);
  195               if (last == null) {
  196                   StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
  197                   last = new AtomicLong(ss.getLastAckedId());
  198                   subscriberLastMessageMap.put(id, last);
  199               }
  200               final AtomicLong lastMessageId = last;
  201   
  202               Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
  203               query.setParameter(1, destinationName);
  204               query.setParameter(2, lastMessageId.get());
  205               query.setMaxResults(maxReturned);
  206               int count = 0;
  207               for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
  208                   Message message = (Message)wireFormat.unmarshal(new ByteSequence(m.getData()));
  209                   listener.recoverMessage(message);
  210                   lastMessageId.set(m.getId());
  211                   count++;
  212                   if (count >= maxReturned) {
  213                       return;
  214                   }
  215               }
  216           } catch (Throwable e) {
  217               adapter.rollbackEntityManager(null, manager);
  218               throw IOExceptionSupport.create(e);
  219           }
  220           adapter.commitEntityManager(null, manager);
  221       }
  222   
  223       public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
  224           EntityManager manager = adapter.beginEntityManager(null);
  225           try {
  226   
  227               StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
  228   
  229               Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
  230               query.setParameter(1, destinationName);
  231               query.setParameter(2, ss.getLastAckedId());
  232               for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
  233                   Message message = (Message)wireFormat.unmarshal(new ByteSequence(m.getData()));
  234                   listener.recoverMessage(message);
  235               }
  236           } catch (Throwable e) {
  237               adapter.rollbackEntityManager(null, manager);
  238               throw IOExceptionSupport.create(e);
  239           }
  240           adapter.commitEntityManager(null, manager);
  241       }
  242   
  243       public void resetBatching(String clientId, String subscriptionName) {
  244           SubscriptionId id = new SubscriptionId();
  245           id.setClientId(clientId);
  246           id.setSubscriptionName(subscriptionName);
  247           id.setDestination(destinationName);
  248   
  249           subscriberLastMessageMap.remove(id);
  250       }
  251   
  252   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » jpa » [javadoc | source]