Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » jpa » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.store.jpa;
   18   
   19   import java.io.IOException;
   20   import java.util.ArrayList;
   21   import java.util.List;
   22   import java.util.Map;
   23   import java.util.concurrent.ConcurrentHashMap;
   24   import java.util.concurrent.atomic.AtomicLong;
   25   
   26   import javax.persistence.EntityManager;
   27   import javax.persistence.Query;
   28   
   29   import org.apache.activemq.broker.ConnectionContext;
   30   import org.apache.activemq.command.ActiveMQDestination;
   31   import org.apache.activemq.command.MessageId;
   32   import org.apache.activemq.command.SubscriptionInfo;
   33   import org.apache.activemq.store.MessageRecoveryListener;
   34   import org.apache.activemq.store.TopicReferenceStore;
   35   import org.apache.activemq.store.jpa.model.StoredMessageReference;
   36   import org.apache.activemq.store.jpa.model.StoredSubscription;
   37   import org.apache.activemq.store.jpa.model.StoredSubscription.SubscriptionId;
   38   import org.apache.activemq.util.IOExceptionSupport;
   39   
   40   public class JPATopicReferenceStore extends JPAReferenceStore implements TopicReferenceStore {
   41       private Map<SubscriptionId, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<SubscriptionId, AtomicLong>();
   42   
   43       public JPATopicReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
   44           super(adapter, destination);
   45       }
   46   
   47       public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
   48           EntityManager manager = adapter.beginEntityManager(context);
   49           try {
   50               StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
   51               ss.setLastAckedId(messageId.getBrokerSequenceId());
   52           } catch (Throwable e) {
   53               adapter.rollbackEntityManager(context, manager);
   54               throw IOExceptionSupport.create(e);
   55           }
   56           adapter.commitEntityManager(context, manager);
   57       }
   58   
   59       public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
   60           EntityManager manager = adapter.beginEntityManager(null);
   61           try {
   62               StoredSubscription ss = new StoredSubscription();
   63               ss.setClientId(info.getClientId());
   64               ss.setSubscriptionName(info.getSubcriptionName());
   65               ss.setDestination(destinationName);
   66               ss.setSelector(info.getSelector());
   67               ss.setSubscribedDestination(info.getSubscribedDestination().getQualifiedName());
   68               ss.setLastAckedId(-1);
   69   
   70               if (!retroactive) {
   71                   Query query = manager.createQuery("select max(m.id) from StoredMessageReference m");
   72                   Long rc = (Long)query.getSingleResult();
   73                   if (rc != null) {
   74                       ss.setLastAckedId(rc);
   75                   }
   76               }
   77   
   78               manager.persist(ss);
   79           } catch (Throwable e) {
   80               adapter.rollbackEntityManager(null, manager);
   81               throw IOExceptionSupport.create(e);
   82           }
   83           adapter.commitEntityManager(null, manager);
   84       }
   85   
   86       public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
   87           EntityManager manager = adapter.beginEntityManager(null);
   88           try {
   89               StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
   90               manager.remove(ss);
   91           } catch (Throwable e) {
   92               adapter.rollbackEntityManager(null, manager);
   93               throw IOExceptionSupport.create(e);
   94           }
   95           adapter.commitEntityManager(null, manager);
   96       }
   97   
   98       private StoredSubscription findStoredSubscription(EntityManager manager, String clientId, String subscriptionName) {
   99           Query query = manager.createQuery("select ss from StoredSubscription ss " + "where ss.clientId=?1 " + "and ss.subscriptionName=?2 " + "and ss.destination=?3");
  100           query.setParameter(1, clientId);
  101           query.setParameter(2, subscriptionName);
  102           query.setParameter(3, destinationName);
  103           List<StoredSubscription> resultList = query.getResultList();
  104           if (resultList.isEmpty()) {
  105               return null;
  106           }
  107           return resultList.get(0);
  108       }
  109   
  110       public SubscriptionInfo[] getAllSubscriptions() throws IOException {
  111           SubscriptionInfo rc[];
  112           EntityManager manager = adapter.beginEntityManager(null);
  113           try {
  114               ArrayList<SubscriptionInfo> l = new ArrayList<SubscriptionInfo>();
  115   
  116               Query query = manager.createQuery("select ss from StoredSubscription ss where ss.destination=?1");
  117               query.setParameter(1, destinationName);
  118               for (StoredSubscription ss : (List<StoredSubscription>)query.getResultList()) {
  119                   SubscriptionInfo info = new SubscriptionInfo();
  120                   info.setClientId(ss.getClientId());
  121                   info.setDestination(destination);
  122                   info.setSelector(ss.getSelector());
  123                   info.setSubscriptionName(ss.getSubscriptionName());
  124                   info.setSubscribedDestination(toSubscribedDestination(ss));
  125                   l.add(info);
  126               }
  127   
  128               rc = new SubscriptionInfo[l.size()];
  129               l.toArray(rc);
  130           } catch (Throwable e) {
  131               adapter.rollbackEntityManager(null, manager);
  132               throw IOExceptionSupport.create(e);
  133           }
  134           adapter.commitEntityManager(null, manager);
  135           return rc;
  136       }
  137   
  138       private ActiveMQDestination toSubscribedDestination(StoredSubscription ss) {
  139           if (ss.getSubscribedDestination() == null) {
  140               return null;
  141           }
  142           return ActiveMQDestination.createDestination(ss.getSubscribedDestination(), ActiveMQDestination.QUEUE_TYPE);
  143       }
  144   
  145       public int getMessageCount(String clientId, String subscriptionName) throws IOException {
  146           Long rc;
  147           EntityManager manager = adapter.beginEntityManager(null);
  148           try {
  149               Query query = manager.createQuery("select count(m) FROM StoredMessageReference m, StoredSubscription ss " + "where ss.clientId=?1 " + "and   ss.subscriptionName=?2 "
  150                                                 + "and   ss.destination=?3 " + "and   m.destination=ss.destination and m.id > ss.lastAckedId");
  151               query.setParameter(1, clientId);
  152               query.setParameter(2, subscriptionName);
  153               query.setParameter(3, destinationName);
  154               rc = (Long)query.getSingleResult();
  155           } catch (Throwable e) {
  156               adapter.rollbackEntityManager(null, manager);
  157               throw IOExceptionSupport.create(e);
  158           }
  159           adapter.commitEntityManager(null, manager);
  160           return rc.intValue();
  161       }
  162   
  163       public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
  164           SubscriptionInfo rc = null;
  165           EntityManager manager = adapter.beginEntityManager(null);
  166           try {
  167               StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
  168               if (ss != null) {
  169                   rc = new SubscriptionInfo();
  170                   rc.setClientId(ss.getClientId());
  171                   rc.setDestination(destination);
  172                   rc.setSelector(ss.getSelector());
  173                   rc.setSubscriptionName(ss.getSubscriptionName());
  174                   rc.setSubscribedDestination(toSubscribedDestination(ss));
  175               }
  176           } catch (Throwable e) {
  177               adapter.rollbackEntityManager(null, manager);
  178               throw IOExceptionSupport.create(e);
  179           }
  180           adapter.commitEntityManager(null, manager);
  181           return rc;
  182       }
  183   
  184       public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
  185           EntityManager manager = adapter.beginEntityManager(null);
  186           try {
  187               SubscriptionId id = new SubscriptionId();
  188               id.setClientId(clientId);
  189               id.setSubscriptionName(subscriptionName);
  190               id.setDestination(destinationName);
  191   
  192               AtomicLong last = subscriberLastMessageMap.get(id);
  193               if (last == null) {
  194                   StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
  195                   last = new AtomicLong(ss.getLastAckedId());
  196                   subscriberLastMessageMap.put(id, last);
  197               }
  198               final AtomicLong lastMessageId = last;
  199   
  200               Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
  201               query.setParameter(1, destinationName);
  202               query.setParameter(2, lastMessageId.get());
  203               query.setMaxResults(maxReturned);
  204               int count = 0;
  205               for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
  206                   MessageId mid = new MessageId(m.getMessageId());
  207                   mid.setBrokerSequenceId(m.getId());
  208                   listener.recoverMessageReference(mid);
  209   
  210                   lastMessageId.set(m.getId());
  211                   count++;
  212                   if (count >= maxReturned) {
  213                       return;
  214                   }
  215               }
  216           } catch (Throwable e) {
  217               adapter.rollbackEntityManager(null, manager);
  218               throw IOExceptionSupport.create(e);
  219           }
  220           adapter.commitEntityManager(null, manager);
  221       }
  222   
  223       public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
  224           EntityManager manager = adapter.beginEntityManager(null);
  225           try {
  226   
  227               StoredSubscription ss = findStoredSubscription(manager, clientId, subscriptionName);
  228   
  229               Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
  230               query.setParameter(1, destinationName);
  231               query.setParameter(2, ss.getLastAckedId());
  232               for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
  233                   MessageId mid = new MessageId(m.getMessageId());
  234                   mid.setBrokerSequenceId(m.getId());
  235                   listener.recoverMessageReference(mid);
  236               }
  237           } catch (Throwable e) {
  238               adapter.rollbackEntityManager(null, manager);
  239               throw IOExceptionSupport.create(e);
  240           }
  241           adapter.commitEntityManager(null, manager);
  242       }
  243   
  244       public void resetBatching(String clientId, String subscriptionName) {
  245           SubscriptionId id = new SubscriptionId();
  246           id.setClientId(clientId);
  247           id.setSubscriptionName(subscriptionName);
  248           id.setDestination(destinationName);
  249           subscriberLastMessageMap.remove(id);
  250       }
  251   
  252       public boolean acknowledgeReference(ConnectionContext context,
  253               String clientId, String subscriptionName, MessageId messageId)
  254               throws IOException {
  255           acknowledge(context, clientId, subscriptionName, messageId);
  256           return true;
  257       }
  258   
  259   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » jpa » [javadoc | source]