Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » jpa » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.store.jpa;
   18   
   19   import java.io.IOException;
   20   import java.util.List;
   21   import java.util.concurrent.atomic.AtomicLong;
   22   
   23   import javax.persistence.EntityManager;
   24   import javax.persistence.Query;
   25   
   26   import org.apache.activemq.broker.ConnectionContext;
   27   import org.apache.activemq.command.ActiveMQDestination;
   28   import org.apache.activemq.command.Message;
   29   import org.apache.activemq.command.MessageAck;
   30   import org.apache.activemq.command.MessageId;
   31   import org.apache.activemq.store.MessageRecoveryListener;
   32   import org.apache.activemq.store.MessageStore;
   33   import org.apache.activemq.store.jpa.model.StoredMessage;
   34   import org.apache.activemq.usage.MemoryUsage;
   35   import org.apache.activemq.usage.SystemUsage;
   36   import org.apache.activemq.util.ByteSequence;
   37   import org.apache.activemq.util.IOExceptionSupport;
   38   import org.apache.activemq.wireformat.WireFormat;
   39   
   40   public class JPAMessageStore implements MessageStore {
   41   
   42       protected final JPAPersistenceAdapter adapter;
   43       protected final WireFormat wireFormat;
   44       protected final ActiveMQDestination destination;
   45       protected final String destinationName;
   46       protected AtomicLong lastMessageId = new AtomicLong(-1);
   47   
   48       public JPAMessageStore(JPAPersistenceAdapter adapter, ActiveMQDestination destination) {
   49           this.adapter = adapter;
   50           this.destination = destination;
   51           this.destinationName = destination.getQualifiedName();
   52           this.wireFormat = this.adapter.getWireFormat();
   53       }
   54   
   55       public void addMessage(ConnectionContext context, Message message) throws IOException {
   56   
   57           EntityManager manager = adapter.beginEntityManager(context);
   58           try {
   59   
   60               ByteSequence sequence = wireFormat.marshal(message);
   61               sequence.compact();
   62   
   63               StoredMessage sm = new StoredMessage();
   64               sm.setDestination(destinationName);
   65               sm.setId(message.getMessageId().getBrokerSequenceId());
   66               sm.setMessageId(message.getMessageId().toString());
   67               sm.setExiration(message.getExpiration());
   68               sm.setData(sequence.data);
   69   
   70               manager.persist(sm);
   71   
   72           } catch (Throwable e) {
   73               adapter.rollbackEntityManager(context, manager);
   74               throw IOExceptionSupport.create(e);
   75           }
   76           adapter.commitEntityManager(context, manager);
   77       }
   78   
   79       public ActiveMQDestination getDestination() {
   80           return destination;
   81       }
   82   
   83       public Message getMessage(MessageId identity) throws IOException {
   84           Message rc;
   85           EntityManager manager = adapter.beginEntityManager(null);
   86           try {
   87               StoredMessage message = null;
   88               if (identity.getBrokerSequenceId() != 0) {
   89                   message = manager.find(StoredMessage.class, identity.getBrokerSequenceId());
   90               } else {
   91                   Query query = manager.createQuery("select m from StoredMessage m where m.messageId=?1");
   92                   query.setParameter(1, identity.toString());
   93                   message = (StoredMessage)query.getSingleResult();
   94               }
   95   
   96               rc = (Message)wireFormat.unmarshal(new ByteSequence(message.getData()));
   97           } catch (Throwable e) {
   98               adapter.rollbackEntityManager(null, manager);
   99               throw IOExceptionSupport.create(e);
  100           }
  101           adapter.commitEntityManager(null, manager);
  102           return rc;
  103       }
  104   
  105       public int getMessageCount() throws IOException {
  106           Long rc;
  107           EntityManager manager = adapter.beginEntityManager(null);
  108           try {
  109               Query query = manager.createQuery("select count(m) from StoredMessage m");
  110               rc = (Long)query.getSingleResult();
  111           } catch (Throwable e) {
  112               adapter.rollbackEntityManager(null, manager);
  113               throw IOExceptionSupport.create(e);
  114           }
  115           adapter.commitEntityManager(null, manager);
  116           return rc.intValue();
  117       }
  118   
  119       @SuppressWarnings("unchecked")
  120       public void recover(MessageRecoveryListener container) throws Exception {
  121           EntityManager manager = adapter.beginEntityManager(null);
  122           try {
  123               Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 order by m.id asc");
  124               query.setParameter(1, destinationName);
  125               for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
  126                   Message message = (Message)wireFormat.unmarshal(new ByteSequence(m.getData()));
  127                   container.recoverMessage(message);
  128               }
  129           } catch (Throwable e) {
  130               adapter.rollbackEntityManager(null, manager);
  131               throw IOExceptionSupport.create(e);
  132           }
  133           adapter.commitEntityManager(null, manager);
  134       }
  135   
  136       @SuppressWarnings("unchecked")
  137       public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
  138   
  139           EntityManager manager = adapter.beginEntityManager(null);
  140           try {
  141   
  142               Query query = manager.createQuery("select m from StoredMessage m where m.destination=?1 and m.id>?2 order by m.id asc");
  143               query.setParameter(1, destinationName);
  144               query.setParameter(2, lastMessageId.get());
  145               query.setMaxResults(maxReturned);
  146               int count = 0;
  147               for (StoredMessage m : (List<StoredMessage>)query.getResultList()) {
  148                   Message message = (Message)wireFormat.unmarshal(new ByteSequence(m.getData()));
  149                   listener.recoverMessage(message);
  150                   lastMessageId.set(m.getId());
  151                   count++;
  152                   if (count >= maxReturned) {
  153                       return;
  154                   }
  155               }
  156   
  157           } catch (Throwable e) {
  158               adapter.rollbackEntityManager(null, manager);
  159               throw IOExceptionSupport.create(e);
  160           }
  161           adapter.commitEntityManager(null, manager);
  162       }
  163   
  164       public void removeAllMessages(ConnectionContext context) throws IOException {
  165           EntityManager manager = adapter.beginEntityManager(context);
  166           try {
  167               Query query = manager.createQuery("delete from StoredMessage m where m.destination=?1");
  168               query.setParameter(1, destinationName);
  169               query.executeUpdate();
  170           } catch (Throwable e) {
  171               adapter.rollbackEntityManager(context, manager);
  172               throw IOExceptionSupport.create(e);
  173           }
  174           adapter.commitEntityManager(context, manager);
  175       }
  176   
  177       public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
  178           EntityManager manager = adapter.beginEntityManager(context);
  179           try {
  180               Query query = manager.createQuery("delete from StoredMessage m where m.id=?1");
  181               query.setParameter(1, ack.getLastMessageId().getBrokerSequenceId());
  182               query.executeUpdate();
  183           } catch (Throwable e) {
  184               adapter.rollbackEntityManager(context, manager);
  185               throw IOExceptionSupport.create(e);
  186           }
  187           adapter.commitEntityManager(context, manager);
  188       }
  189   
  190       public void resetBatching() {
  191           lastMessageId.set(-1);
  192       }
  193   
  194       public void setMemoryUsage(MemoryUsage memoeyUSage){
  195       }
  196   
  197       public void start() throws Exception {
  198       }
  199   
  200       public void stop() throws Exception {
  201       }
  202   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » jpa » [javadoc | source]