Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » jpa » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.store.jpa;
   18   
   19   import java.io.File;
   20   import java.io.IOException;
   21   import java.util.HashSet;
   22   import java.util.List;
   23   import java.util.Properties;
   24   import java.util.Set;
   25   
   26   import javax.persistence.EntityManager;
   27   import javax.persistence.EntityManagerFactory;
   28   import javax.persistence.Persistence;
   29   import javax.persistence.Query;
   30   
   31   import org.apache.activemq.broker.ConnectionContext;
   32   import org.apache.activemq.command.ActiveMQDestination;
   33   import org.apache.activemq.command.ActiveMQQueue;
   34   import org.apache.activemq.command.ActiveMQTopic;
   35   import org.apache.activemq.openwire.OpenWireFormatFactory;
   36   import org.apache.activemq.store.MessageStore;
   37   import org.apache.activemq.store.PersistenceAdapter;
   38   import org.apache.activemq.store.TopicMessageStore;
   39   import org.apache.activemq.store.TransactionStore;
   40   import org.apache.activemq.store.memory.MemoryTransactionStore;
   41   import org.apache.activemq.usage.SystemUsage;
   42   import org.apache.activemq.util.IOExceptionSupport;
   43   import org.apache.activemq.wireformat.WireFormat;
   44   import org.apache.commons.logging.Log;
   45   import org.apache.commons.logging.LogFactory;
   46   
   47   /**
   48    * An implementation of {@link PersistenceAdapter} that uses JPA to store it's
   49    * messages.
   50    * 
   51    * @org.apache.xbean.XBean element="jpaPersistenceAdapter"
   52    * @version $Revision: 1.17 $
   53    */
   54   public class JPAPersistenceAdapter implements PersistenceAdapter {
   55   
   56       String entityManagerName = "activemq";
   57       Properties entityManagerProperties = System.getProperties();
   58       EntityManagerFactory entityManagerFactory;
   59       private WireFormat wireFormat;
   60       private MemoryTransactionStore transactionStore;
   61   
   62       public void beginTransaction(ConnectionContext context) throws IOException {
   63           if (context.getLongTermStoreContext() != null) {
   64               throw new IOException("Transation already started.");
   65           }
   66           EntityManager manager = getEntityManagerFactory().createEntityManager();
   67           manager.getTransaction().begin();
   68           context.setLongTermStoreContext(manager);
   69       }
   70   
   71       public void commitTransaction(ConnectionContext context) throws IOException {
   72           EntityManager manager = (EntityManager)context.getLongTermStoreContext();
   73           if (manager == null) {
   74               throw new IOException("Transation not started.");
   75           }
   76           context.setLongTermStoreContext(null);
   77           manager.getTransaction().commit();
   78           manager.close();
   79       }
   80   
   81       public void rollbackTransaction(ConnectionContext context) throws IOException {
   82           EntityManager manager = (EntityManager)context.getLongTermStoreContext();
   83           if (manager == null) {
   84               throw new IOException("Transation not started.");
   85           }
   86           context.setLongTermStoreContext(null);
   87           manager.getTransaction().rollback();
   88           manager.close();
   89       }
   90   
   91       public EntityManager beginEntityManager(ConnectionContext context) {
   92           if (context == null || context.getLongTermStoreContext() == null) {
   93               EntityManager manager = getEntityManagerFactory().createEntityManager();
   94               manager.getTransaction().begin();
   95               return manager;
   96           } else {
   97               return (EntityManager)context.getLongTermStoreContext();
   98           }
   99       }
  100   
  101       public void commitEntityManager(ConnectionContext context, EntityManager manager) {
  102           if (context == null || context.getLongTermStoreContext() == null) {
  103               manager.getTransaction().commit();
  104               manager.close();
  105           }
  106       }
  107   
  108       public void rollbackEntityManager(ConnectionContext context, EntityManager manager) {
  109           if (context == null || context.getLongTermStoreContext() == null) {
  110               manager.getTransaction().rollback();
  111               manager.close();
  112           }
  113       }
  114   
  115       public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
  116           MessageStore rc = new JPAMessageStore(this, destination);
  117           if (transactionStore != null) {
  118               rc = transactionStore.proxy(rc);
  119           }
  120           return rc;
  121       }
  122   
  123       public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
  124           TopicMessageStore rc = new JPATopicMessageStore(this, destination);
  125           if (transactionStore != null) {
  126               rc = transactionStore.proxy(rc);
  127           }
  128           return rc;
  129       }
  130   
  131       public TransactionStore createTransactionStore() throws IOException {
  132           if (transactionStore == null) {
  133               transactionStore = new MemoryTransactionStore();
  134           }
  135           return this.transactionStore;
  136       }
  137   
  138       public void deleteAllMessages() throws IOException {
  139           EntityManager manager = beginEntityManager(null);
  140           try {
  141               Query query = manager.createQuery("delete from StoredMessage m");
  142               query.executeUpdate();
  143               query = manager.createQuery("delete from StoredSubscription ss");
  144               query.executeUpdate();
  145           } catch (Throwable e) {
  146               rollbackEntityManager(null, manager);
  147               throw IOExceptionSupport.create(e);
  148           }
  149           commitEntityManager(null, manager);
  150       }
  151   
  152       public Set<ActiveMQDestination> getDestinations() {
  153           HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
  154   
  155           EntityManager manager = beginEntityManager(null);
  156           try {
  157               Query query = manager.createQuery("select distinct m.destination from StoredMessage m");
  158               for (String dest : (List<String>)query.getResultList()) {
  159                   rc.add(ActiveMQDestination.createDestination(dest, ActiveMQDestination.QUEUE_TYPE));
  160               }
  161           } catch (RuntimeException e) {
  162               rollbackEntityManager(null, manager);
  163               throw e;
  164           }
  165           commitEntityManager(null, manager);
  166           return rc;
  167       }
  168   
  169       public long getLastMessageBrokerSequenceId() throws IOException {
  170           long rc = 0;
  171           EntityManager manager = beginEntityManager(null);
  172           try {
  173               Query query = manager.createQuery("select max(m.id) from StoredMessage m");
  174               Long t = (Long)query.getSingleResult();
  175               if (t != null) {
  176                   rc = t;
  177               }
  178           } catch (Throwable e) {
  179               rollbackEntityManager(null, manager);
  180               throw IOExceptionSupport.create(e);
  181           }
  182           commitEntityManager(null, manager);
  183           return rc;
  184       }
  185   
  186       public boolean isUseExternalMessageReferences() {
  187           return false;
  188       }
  189   
  190       public void setUsageManager(SystemUsage usageManager) {
  191       }
  192   
  193       public void start() throws Exception {
  194       }
  195   
  196       public void stop() throws Exception {
  197           if (entityManagerFactory != null) {
  198               entityManagerFactory.close();
  199           }
  200       }
  201   
  202       public EntityManagerFactory getEntityManagerFactory() {
  203           if (entityManagerFactory == null) {
  204               entityManagerFactory = createEntityManagerFactory();
  205           }
  206           return entityManagerFactory;
  207       }
  208   
  209       protected EntityManagerFactory createEntityManagerFactory() {
  210           return Persistence.createEntityManagerFactory(getEntityManagerName(), getEntityManagerProperties());
  211       }
  212   
  213       public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
  214           this.entityManagerFactory = entityManagerFactory;
  215       }
  216   
  217       public Properties getEntityManagerProperties() {
  218           return entityManagerProperties;
  219       }
  220   
  221       public void setEntityManagerProperties(Properties entityManagerProperties) {
  222           this.entityManagerProperties = entityManagerProperties;
  223       }
  224   
  225       public String getEntityManagerName() {
  226           return entityManagerName;
  227       }
  228   
  229       public void setEntityManagerName(String entityManager) {
  230           this.entityManagerName = entityManager;
  231       }
  232   
  233       public WireFormat getWireFormat() {
  234           if (wireFormat == null) {
  235               wireFormat = createWireFormat();
  236           }
  237           return wireFormat;
  238       }
  239   
  240       private WireFormat createWireFormat() {
  241           OpenWireFormatFactory wff = new OpenWireFormatFactory();
  242           return wff.createWireFormat();
  243       }
  244   
  245       public void setWireFormat(WireFormat wireFormat) {
  246           this.wireFormat = wireFormat;
  247       }
  248   
  249       public void checkpoint(boolean sync) throws IOException {
  250       }
  251   
  252       public void setBrokerName(String brokerName) {
  253       }
  254   
  255       public void setDirectory(File dir) {
  256       }
  257       
  258       public long size(){
  259           return 0;
  260       }
  261   
  262   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » store » jpa » [javadoc | source]