Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-8050

XAException when failing over in the middle of a transaction

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 5.16.0
    • None
    • None

    Description

      We have been plagued in production by growing disk usage in KahaDB on our ActiveMQs. We have found that this is caused by hanging transactions, and the only solution so far has been to restart the broker. The hanging transactions happen when we have the occasional network glitch. The networking is out of our control, and not something we can fix.

      However, we have found a workaround. Our clients are MDBs in Wildfly. If we disable failover for these, and instead let Wildfly handle creating new connections we don't see the issue.

      I have been able to reproduce the error in a unit test. When there is a connection disturbance in the middle of a transaction (on the consumer end) and the client fails over to another broker in the network; it tries to commit the transaction on the new broker.
      This fails with 

      Transaction 'XID:[...]' has not been started. xaErrorCode:-4`

      and the transaction ends up in a weird state on the broker.

      We are not using any replicated persistence adapters, just local kahaDB for each broker in the network.

      I'm not sure if the error is actually in the client, that can't handle failover during a transaction, or in the broker that doesn't distribute the transaction properly to the other brokers in the network.

      I'm also very open to the possibility that this is simply a configuration error on our end, but if so, I have no idea what.

      I'm adding the unit test where I have reproduced it. I happily admit that I don't know much about how transactions actually behave in reality, so I might have misconfigured them here, but we see the exact same behaviour in production code where transactions are managed by Wildfly.

      package test;
      
      import static org.awaitility.Awaitility.await;
      
      import org.apache.activemq.ActiveMQMessageConsumer;
      import org.apache.activemq.ActiveMQXAConnection;
      import org.apache.activemq.ActiveMQXAConnectionFactory;
      import org.apache.activemq.broker.BrokerService;
      import org.apache.activemq.broker.TransportConnector;
      import org.apache.activemq.broker.region.policy.PolicyEntry;
      import org.apache.activemq.broker.region.policy.PolicyMap;
      import org.apache.activemq.transport.failover.FailoverTransport;
      import org.junit.Assert;
      import org.junit.Test;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.io.DataOutputStream;
      import java.io.IOException;
      import java.net.URI;
      import java.time.Duration;
      import java.time.Instant;
      import java.time.temporal.ChronoUnit;
      import java.util.Objects;
      import java.util.UUID;
      import java.util.concurrent.atomic.AtomicLong;
      
      import javax.jms.JMSException;
      import javax.jms.Message;
      import javax.jms.MessageProducer;
      import javax.jms.Queue;
      import javax.jms.TextMessage;
      import javax.jms.XAConnection;
      import javax.jms.XASession;
      import javax.transaction.xa.XAException;
      import javax.transaction.xa.XAResource;
      import javax.transaction.xa.Xid;
      
      public class FailoverErrorTest {
      
        private static final Logger logger = LoggerFactory.getLogger(ClusterTest.class);
      
        private static BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
          BrokerService broker = new BrokerService();
          broker.setUseJmx(true);
          broker.setAdvisorySupport(true);
          TransportConnector transportConnector = new TransportConnector();
          transportConnector.setName("openwire");
          transportConnector.setUri(URI.create(bindAddress));
          transportConnector.setRebalanceClusterClients(true);
          transportConnector.setUpdateClusterClientsOnRemove(true);
          broker.addConnector(transportConnector);
          broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
      
          PolicyMap policyMap = new PolicyMap();
          PolicyEntry defaultEntry = new PolicyEntry();
          policyMap.setDefaultEntry(defaultEntry);
          broker.setDestinationPolicy(policyMap);
      
          return broker;
        }
      
        private static Message getMessage(String messageId, ActiveMQMessageConsumer consumer) throws JMSException {
          String receivedMessageId = null;
          Instant start = Instant.now();
          while (receivedMessageId == null || !Objects.equals(messageId, receivedMessageId)) {
            if (Instant.now().isAfter(start.plus(5, ChronoUnit.SECONDS))) {
              Assert.fail("timeout");
            }
            Message msg = consumer.receive(20000);
            Assert.assertNotNull("Couldn't get message", msg);
            receivedMessageId = msg.getStringProperty("my_id");
            if (!Objects.equals(messageId, receivedMessageId)) {
              logger.info("Got the wrong message. Looping.");
            } else {
              logger.info("Found message");
              return msg;
            }
          }
          return null;
        }
      
        private static Xid createXid() throws IOException {
          final AtomicLong txGenerator = new AtomicLong(System.currentTimeMillis());
      
          java.io.ByteArrayOutputStream baos = new java.io.ByteArrayOutputStream();
          DataOutputStream os = new DataOutputStream(baos);
          os.writeLong(txGenerator.incrementAndGet());
          os.close();
          final byte[] bs = baos.toByteArray();
      
          return new Xid() {
            @Override
            public int getFormatId() {
              return 86;
            }
      
            @Override
            public byte[] getGlobalTransactionId() {
              return bs;
            }
      
            @Override
            public byte[] getBranchQualifier() {
              return bs;
            }
          };
        }
      
        @Test
        public void failoverWithExceptionProgrammaticBrokers() throws Exception {
          BrokerService broker1 = createBroker(true, "tcp://localhost:11001");
          broker1.setBrokerName("broker1");
          BrokerService broker2 = createBroker(true, "tcp://localhost:11002");
          broker2.setBrokerName("broker2");
      
          XAConnection producerConnection = null;
          ActiveMQXAConnection consumerConnection = null;
          try {
            System.setProperty("org.slf4j.simpleLogger.log." + FailoverTransport.class.getName(), "DEBUG");
      
            broker1.start();
            broker2.start();
            await().atMost(Duration.ofSeconds(10)).until(() -> broker1.isStarted() && broker2.isStarted());
      
            broker1.addNetworkConnector("static:(tcp://localhost:11002)");
            broker2.addNetworkConnector("static:(tcp://localhost:11001)");
            broker1.getNetworkConnectors().get(0).start();
            broker2.getNetworkConnectors().get(0).start();
      
            await().atMost(Duration.ofSeconds(30))
                .until(() -> broker1.getNetworkConnectors().get(0).isStarted() &&
                    broker2.getNetworkConnectors().get(0).isStarted());
      
            String queueName = "MY_QUEUE";
      
            String url = "failover:(tcp://localhost:11001,tcp://localhost:11002)";
            ActiveMQXAConnectionFactory firstFactory = new ActiveMQXAConnectionFactory(url);
            producerConnection = firstFactory.createXAConnection();
            producerConnection.setClientID("PRODUCER");
            producerConnection.start();
            XASession producerSession = producerConnection.createXASession();
            Queue producerDestination = producerSession.createQueue(queueName);
            Xid xid = createXid();
            producerSession.getXAResource().start(xid, XAResource.TMNOFLAGS);
            String messageId = UUID.randomUUID().toString();
            MessageProducer producer = producerSession.createProducer(producerDestination);
            TextMessage sendMessage = producerSession.createTextMessage("Test message");
            sendMessage.setStringProperty("my_id", messageId);
            producer.send(sendMessage);
            producerSession.getXAResource().end(xid, XAResource.TMSUCCESS);
            producerSession.getXAResource().prepare(xid);
            producerSession.getXAResource().commit(xid, false);
      
            consumerConnection = (ActiveMQXAConnection) firstFactory.createXAConnection();
            consumerConnection.setClientID("CONSUMER");
            consumerConnection.start();
            XASession consumerSession = consumerConnection.createXASession();
            Queue consumerDestination = consumerSession.createQueue(queueName);
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(consumerDestination);
      
            Xid consumerXid = createXid();
            consumerSession.getXAResource().start(consumerXid, XAResource.TMNOFLAGS);
            Message message = getMessage(messageId, consumer);
            consumerSession.getXAResource().end(consumerXid, XAResource.TMSUCCESS);
            consumerSession.getXAResource().prepare(consumerXid);
      
            logger.info("Simulating dropped connection");
            FailoverTransport transport = consumerConnection.getTransport().narrow(FailoverTransport.class);
            URI currentTransport = transport.getConnectedTransportURI();
            transport.handleTransportFailure(new IOException("Fake fail"));
            await().atMost(Duration.ofSeconds(10)).until(() -> !Objects.equals(currentTransport, transport.getConnectedTransportURI()) && transport.isConnected());
            Assert.assertTrue(transport.isConnected());
            Assert.assertNotEquals(currentTransport, transport.getConnectedTransportURI());
            message.acknowledge();
            consumerSession.getXAResource().commit(consumerXid, false);
          } catch (XAException e) {
            if (e.errorCode == -4) {
              logger.info("Recreated error successfully");
            } else {
              logger.error("Got XAException " + e.errorCode, e);
              Assert.fail();
            }
          } finally {
            producerConnection.close();
            consumerConnection.close();
            broker1.getNetworkConnectors().get(0).stop();
            broker2.getNetworkConnectors().get(0).stop();
            await().atMost(Duration.ofSeconds(5)).until(() -> broker1.getNetworkConnectors().get(0).isStopped() && broker2.getNetworkConnectors().get(0).isStopped());
            broker1.stop();
            broker2.stop();
          }
        }
      }
      

      pom.xml:

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
      
      
        <groupId>test</groupId>
        <artifactId>cluster-test</artifactId>
        <version>1</version>
      
        <properties>
          <!--    <version.activemq>5.11.0.redhat-630475</version.activemq>-->
          <version.activemq>5.16.0</version.activemq>
          <maven.compiler.target>1.8</maven.compiler.target>
          <maven.compiler.source>1.8</maven.compiler.source>
        </properties>
      
      
        <dependencies>
          <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13</version>
          </dependency>
          <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-broker</artifactId>
            <version>${version.activemq}</version>
            <scope>test</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-kahadb-store</artifactId>
            <version>${version.activemq}</version>
            <scope>test</scope>
          </dependency>
          <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.30</version>
            <scope>test</scope>
          </dependency>
          <dependency>
            <groupId>org.awaitility</groupId>
            <artifactId>awaitility</artifactId>
            <version>4.0.3</version>
            <scope>test</scope>
          </dependency>
      
        </dependencies>
      
      </project>
      
      
      

      Thanks

      Attachments

        Activity

          People

            jbonofre Jean-Baptiste Onofré
            erikh Erik Håkansson
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: