Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Abandoned
-
5.x
-
None
-
None
-
Network of Brokers. Platform agnostic. Local Broker has a networkConnector defined to forward all messages to a remote broker.
Description
Requirement
1. Allow network connector to use transactions when forwarding persistent messages.
2. Provide the following new network connector properties:
maxMessagesPerTransaction - when specified and great than 1, use transactions.
maxTransactionLatencyMillis - commit immediately when time passed since last commit is more than specified.
Let's say both parameters are set as 1000.
Network connector should commit after every 1000 messages or when more than 1000ms passed since last commit (the sooner).
Background
Persistent messages throughput is significantly slower.
When using transactions and committing every 1000 messages, throughput on local broker with levelDB is about 12,000 messages of 1KB per second.
Network connector does not use transactions. Thus, its throughput is limited to few hundreds messages per second.
When imitating network connector functionality (receive from local broker and send to remote broker) using transactions on both sessions, I managed to have a sustained throughput of 10,000 messages/sec stored on local broker plus up to 11,000 messages/s forwarded to remote broker (forwarding throughput must be higher to allow catch up after reconnect).
Sample code
import java.util.Date; import javax.jms.*; import javax.jms.Connection; import javax.jms.Message; import org.apache.activemq.*; import org.apache.activemq.broker.*; public class TransactionalStoreAndForward implements Runnable { private final String m_queueName; private final ActiveMQConnectionFactory m_fromAMQF, m_toAMQF; private Connection m_fromConn = null, m_toConn = null; private Session m_fromSess = null, m_toSess = null; private MessageConsumer m_msgConsumer = null; private MessageProducer m_msgProducer = null; private boolean m_cont = true; public static final int MAX_MESSAGES_PER_TRANSACTION = 500; public static final long MAX_TRANSACTION_LATENCY_MILLIS = 5000L; public TransactionalStoreAndForward(String fromUri, String toUri, String queueName) { m_fromAMQF = new ActiveMQConnectionFactory(fromUri); m_toAMQF = new ActiveMQConnectionFactory(toUri); m_queueName = queueName; } @Override public void run() { while (m_cont) { connect(); process(); } } private void process() { long txMessages = 0, totalMessages = 0, lastPrintMessages = 0; long startTime = 0L; long lastTxTime = startTime, lastPrintTime = startTime; Message msg = null; try { while (m_cont) { while ((msg = m_msgConsumer.receive(MAX_TRANSACTION_LATENCY_MILLIS)) != null) { if (startTime == 0) { startTime = System.currentTimeMillis(); lastTxTime = startTime; lastPrintTime = startTime; } m_msgProducer.send(msg); txMessages++; totalMessages++; if (txMessages == MAX_MESSAGES_PER_TRANSACTION || System.currentTimeMillis() - lastTxTime > MAX_TRANSACTION_LATENCY_MILLIS) { m_toSess.commit(); m_fromSess.commit(); lastTxTime = System.currentTimeMillis(); txMessages = 0; } if (System.currentTimeMillis() - lastPrintTime > 10000L) { System.out.println("processed " + (totalMessages - lastPrintMessages) + " messages during last 10 seconds. Avg. messages/s: " + (totalMessages * 1000L / (System.currentTimeMillis() - startTime)) + " at " + new Date()); lastPrintTime = System.currentTimeMillis(); lastPrintMessages = totalMessages; } } if (txMessages > 0) { m_toSess.commit(); m_fromSess.commit(); lastTxTime = System.currentTimeMillis(); txMessages = 0; } else { System.out.println("Idle for more than a minute at " + new Date()); } } } catch(JMSException jmse) { System.out.println("About to rollback " + txMessages + " messages due to: " + jmse.getMessage()); try { m_toSess.rollback(); m_fromSess.rollback(); System.out.println("Rollback completed. will reconnect soon ..."); } catch (JMSException re) { System.out.println("Rollback failed !!!"); re.printStackTrace(); } } } private void connect() { boolean isNotOK = true; String target = null; while (isNotOK) { try { if (m_fromConn != null) { m_fromConn.close(); m_fromConn = null; } if (m_toConn != null) { m_toConn.close(); m_toConn = null; } target = m_fromAMQF.getBrokerURL(); m_fromConn = m_fromAMQF.createConnection(); m_fromConn.start(); m_fromSess = m_fromConn.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination fromDest = m_fromSess.createQueue(m_queueName); m_msgConsumer = m_fromSess.createConsumer(fromDest); target = m_toAMQF.getBrokerURL(); m_toConn = m_toAMQF.createConnection(); m_toConn.start(); m_toSess = m_toConn.createSession(true, Session.AUTO_ACKNOWLEDGE); Destination toDest = m_toSess.createQueue(m_queueName); m_msgProducer = m_toSess.createProducer(toDest); isNotOK = false; System.out.println("Successful connection at " + new Date()); } catch(Exception e) { System.out.println("Failed to connect to " + target + " due to: " + e.getMessage()); try { Thread.sleep(60000L); } catch (InterruptedException e1) {} System.out.println("About to retry connection at " + new Date()); } } } public void cleanup() throws Exception { m_cont = false; if (m_fromConn != null) { m_fromConn.close(); m_fromConn = null; } if (m_toConn != null) { m_toConn.close(); m_toConn = null; } } public static void main(String[] args) throws Exception { BrokerService broker = BrokerFactory.createBroker("xbean:activemq_gateway.xml", true); broker.waitUntilStarted(); TransactionalStoreAndForward tsaf = new TransactionalStoreAndForward("vm://AuditGW", "tcp://10.2.154.51:61616", "AUDIT.EVENT"); Thread t = new Thread(tsaf); t.start(); t.join(); tsaf.cleanup(); broker.stop(); broker.waitUntilStopped(); } }