Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » jpa » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.store.jpa;
   18   
   19   import java.io.IOException;
   20   import java.util.List;
   21   import java.util.concurrent.atomic.AtomicLong;
   22   import java.util.concurrent.locks.Lock;
   23   import java.util.concurrent.locks.ReentrantLock;
   24   
   25   import javax.persistence.EntityManager;
   26   import javax.persistence.Query;
   27   
   28   import org.apache.activemq.broker.ConnectionContext;
   29   import org.apache.activemq.command.ActiveMQDestination;
   30   import org.apache.activemq.command.Message;
   31   import org.apache.activemq.command.MessageAck;
   32   import org.apache.activemq.command.MessageId;
   33   import org.apache.activemq.store.MessageRecoveryListener;
   34   import org.apache.activemq.store.ReferenceStore;
   35   import org.apache.activemq.store.jpa.model.StoredMessageReference;
   36   import org.apache.activemq.usage.MemoryUsage;
   37   import org.apache.activemq.usage.SystemUsage;
   38   import org.apache.activemq.util.IOExceptionSupport;
   39   import org.apache.activemq.wireformat.WireFormat;
   40   
   41   public class JPAReferenceStore implements ReferenceStore {
   42   
   43       protected final JPAPersistenceAdapter adapter;
   44       protected final WireFormat wireFormat;
   45       protected final ActiveMQDestination destination;
   46       protected final String destinationName;
   47       protected AtomicLong lastMessageId = new AtomicLong(-1);
   48       protected final Lock lock = new ReentrantLock();
   49       
   50       public JPAReferenceStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
   51           this.adapter = adapter;
   52           this.destination = destination;
   53           this.destinationName = destination.getQualifiedName();
   54           this.wireFormat = this.adapter.getWireFormat();
   55       }
   56       
   57       public Lock getStoreLock() {
   58           return lock;
   59       }
   60   
   61       public ActiveMQDestination getDestination() {
   62           return destination;
   63       }
   64   
   65       public void addMessage(ConnectionContext context, Message message) throws IOException {
   66           throw new RuntimeException("Use addMessageReference instead");
   67       }
   68   
   69       public Message getMessage(MessageId identity) throws IOException {
   70           throw new RuntimeException("Use addMessageReference instead");
   71       }
   72   
   73       public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException {
   74           EntityManager manager = adapter.beginEntityManager(context);
   75           try {
   76   
   77               StoredMessageReference sm = new StoredMessageReference();
   78               sm.setDestination(destinationName);
   79               sm.setId(messageId.getBrokerSequenceId());
   80               sm.setMessageId(messageId.toString());
   81               sm.setExiration(data.getExpiration());
   82               sm.setFileId(data.getFileId());
   83               sm.setOffset(data.getOffset());
   84   
   85               manager.persist(sm);
   86   
   87           } catch (Throwable e) {
   88               adapter.rollbackEntityManager(context, manager);
   89               throw IOExceptionSupport.create(e);
   90           }
   91           adapter.commitEntityManager(context, manager);
   92       }
   93   
   94       public ReferenceData getMessageReference(MessageId identity) throws IOException {
   95           ReferenceData rc = null;
   96           EntityManager manager = adapter.beginEntityManager(null);
   97           try {
   98               StoredMessageReference message = null;
   99               if (identity.getBrokerSequenceId() != 0) {
  100                   message = manager.find(StoredMessageReference.class, identity.getBrokerSequenceId());
  101               } else {
  102                   Query query = manager.createQuery("select m from StoredMessageReference m where m.messageId=?1");
  103                   query.setParameter(1, identity.toString());
  104                   message = (StoredMessageReference)query.getSingleResult();
  105               }
  106               if (message != null) {
  107                   rc = new ReferenceData();
  108                   rc.setExpiration(message.getExiration());
  109                   rc.setFileId(message.getFileId());
  110                   rc.setOffset(message.getOffset());
  111               }
  112           } catch (Throwable e) {
  113               adapter.rollbackEntityManager(null, manager);
  114               throw IOExceptionSupport.create(e);
  115           }
  116           adapter.commitEntityManager(null, manager);
  117           return rc;
  118       }
  119   
  120       public int getMessageCount() throws IOException {
  121           Long rc;
  122           EntityManager manager = adapter.beginEntityManager(null);
  123           try {
  124               Query query = manager.createQuery("select count(m) from StoredMessageReference m");
  125               rc = (Long)query.getSingleResult();
  126           } catch (Throwable e) {
  127               adapter.rollbackEntityManager(null, manager);
  128               throw IOExceptionSupport.create(e);
  129           }
  130           adapter.commitEntityManager(null, manager);
  131           return rc.intValue();
  132       }
  133   
  134       public void recover(MessageRecoveryListener container) throws Exception {
  135           EntityManager manager = adapter.beginEntityManager(null);
  136           try {
  137               Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 order by m.id asc");
  138               query.setParameter(1, destinationName);
  139               for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
  140                   MessageId id = new MessageId(m.getMessageId());
  141                   id.setBrokerSequenceId(m.getId());
  142                   container.recoverMessageReference(id);
  143               }
  144           } catch (Throwable e) {
  145               adapter.rollbackEntityManager(null, manager);
  146               throw IOExceptionSupport.create(e);
  147           }
  148           adapter.commitEntityManager(null, manager);
  149       }
  150   
  151       public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
  152   
  153           EntityManager manager = adapter.beginEntityManager(null);
  154           try {
  155   
  156               Query query = manager.createQuery("select m from StoredMessageReference m where m.destination=?1 and m.id>?2 order by m.id asc");
  157               query.setParameter(1, destinationName);
  158               query.setParameter(2, lastMessageId.get());
  159               query.setMaxResults(maxReturned);
  160               int count = 0;
  161               for (StoredMessageReference m : (List<StoredMessageReference>)query.getResultList()) {
  162                   MessageId id = new MessageId(m.getMessageId());
  163                   id.setBrokerSequenceId(m.getId());
  164                   listener.recoverMessageReference(id);
  165                   lastMessageId.set(m.getId());
  166                   count++;
  167                   if (count >= maxReturned) {
  168                       return;
  169                   }
  170               }
  171   
  172           } catch (Throwable e) {
  173               adapter.rollbackEntityManager(null, manager);
  174               throw IOExceptionSupport.create(e);
  175           }
  176           adapter.commitEntityManager(null, manager);
  177       }
  178   
  179       public void removeAllMessages(ConnectionContext context) throws IOException {
  180           EntityManager manager = adapter.beginEntityManager(context);
  181           try {
  182               Query query = manager.createQuery("delete from StoredMessageReference m where m.destination=?1");
  183               query.setParameter(1, destinationName);
  184               query.executeUpdate();
  185           } catch (Throwable e) {
  186               adapter.rollbackEntityManager(context, manager);
  187               throw IOExceptionSupport.create(e);
  188           }
  189           adapter.commitEntityManager(context, manager);
  190       }
  191   
  192       public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
  193           EntityManager manager = adapter.beginEntityManager(context);
  194           try {
  195               Query query = manager.createQuery("delete from StoredMessageReference m where m.id=?1");
  196               query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId());
  197               query.executeUpdate();
  198           } catch (Throwable e) {
  199               adapter.rollbackEntityManager(context, manager);
  200               throw IOExceptionSupport.create(e);
  201           }
  202           adapter.commitEntityManager(context, manager);
  203       }
  204   
  205       public void resetBatching() {
  206           lastMessageId.set(-1);
  207       }
  208   
  209       public void setMemoryUsage(MemoryUsage memoeyUSage){
  210       }
  211   
  212       public void start() throws Exception {
  213       }
  214   
  215       public void stop() throws Exception {
  216       }
  217   
  218       public void setBatch(MessageId startAfter) {
  219       }
  220   
  221       public boolean supportsExternalBatchControl() {
  222           return false;
  223       }
  224   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » jpa » [javadoc | source]