Provides a TransactionStore implementation that can create transaction aware
MessageStore objects from non transaction aware MessageStore objects.
Method from org.apache.activemq.store.kahadaptor.KahaTransactionStore Detail: |
void addMessage(MessageStore destination,
Message message) throws IOException {
try {
if (message.isInTransaction()) {
KahaTransaction tx = getOrCreateTx(message.getTransactionId());
tx.add((KahaMessageStore)destination, message);
} else {
destination.addMessage(null, message);
}
} catch (RuntimeStoreException rse) {
if (rse.getCause() instanceof IOException) {
brokerService.handleIOException((IOException)rse.getCause());
}
throw rse;
}
}
|
public void commit(TransactionId txid,
boolean wasPrepared) throws IOException {
KahaTransaction tx = getTx(txid);
if (tx != null) {
tx.commit(this);
removeTx(txid);
}
}
|
public void delete() {
transactions.clear();
prepared.clear();
}
|
protected synchronized KahaTransaction getOrCreateTx(TransactionId key) {
KahaTransaction result = (KahaTransaction)transactions.get(key);
if (result == null) {
result = new KahaTransaction();
transactions.put(key, result);
}
return result;
}
|
protected MessageStore getStoreById(Object id) {
return adaptor.retrieveMessageStore(id);
}
|
protected synchronized KahaTransaction getTx(TransactionId key) {
KahaTransaction result = (KahaTransaction)transactions.get(key);
if (result == null) {
result = (KahaTransaction)prepared.get(key);
}
return result;
}
|
public void prepare(TransactionId txid) {
KahaTransaction tx = getTx(txid);
if (tx != null) {
tx.prepare();
prepared.put(txid, tx);
}
}
|
public MessageStore proxy(MessageStore messageStore) {
return new ProxyMessageStore(messageStore) {
public void addMessage(ConnectionContext context, final Message send) throws IOException {
KahaTransactionStore.this.addMessage(getDelegate(), send);
}
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
KahaTransactionStore.this.removeMessage(getDelegate(), ack);
}
};
}
|
public TopicMessageStore proxy(TopicMessageStore messageStore) {
return new ProxyTopicMessageStore(messageStore) {
public void addMessage(ConnectionContext context, final Message send) throws IOException {
KahaTransactionStore.this.addMessage(getDelegate(), send);
}
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
KahaTransactionStore.this.removeMessage(getDelegate(), ack);
}
};
}
|
public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
for (Iterator i = prepared.entrySet().iterator(); i.hasNext();) {
Map.Entry entry = (Entry)i.next();
XATransactionId xid = (XATransactionId)entry.getKey();
KahaTransaction kt = (KahaTransaction)entry.getValue();
listener.recover(xid, kt.getMessages(), kt.getAcks());
}
}
|
final void removeMessage(MessageStore destination,
MessageAck ack) throws IOException {
try {
if (ack.isInTransaction()) {
KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
tx.add((KahaMessageStore)destination, ack);
} else {
destination.removeMessage(null, ack);
}
} catch (RuntimeStoreException rse) {
if (rse.getCause() instanceof IOException) {
brokerService.handleIOException((IOException)rse.getCause());
}
throw rse;
}
}
|
protected synchronized void removeTx(TransactionId key) {
transactions.remove(key);
prepared.remove(key);
}
|
public void rollback(TransactionId txid) {
KahaTransaction tx = getTx(txid);
if (tx != null) {
tx.rollback();
removeTx(txid);
}
}
|
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
|
public void start() throws Exception {
}
|
public void stop() throws Exception {
}
|