Uploaded image for project: 'Apache Curator'
  1. Apache Curator
  2. CURATOR-623

DistributedQueue stops filling after long disconnect from cluster

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 5.3.0
    • None
    • None

    Description

      One of our VMs had network down for 12 minutes and after the network was up, the queues have stopped being filled by external processes as curator gave up on all watchers. Here is a test reproducing the issue:

      import junit.framework.TestCase;
      import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
      import org.apache.curator.framework.CuratorFramework;
      import org.apache.curator.framework.CuratorFrameworkFactory;
      import org.apache.curator.framework.recipes.queue.DistributedQueue;
      import org.apache.curator.framework.recipes.queue.QueueBuilder;
      import org.apache.curator.framework.recipes.queue.QueueConsumer;
      import org.apache.curator.framework.recipes.queue.QueueSerializer;
      import org.apache.curator.framework.state.ConnectionState;
      import org.apache.curator.retry.ExponentialBackoffRetry;
      import org.apache.curator.test.TestingCluster;
      
      import java.util.concurrent.CompletableFuture;
      import java.util.concurrent.TimeUnit;
      import java.util.function.Consumer;
      
      public class DistributedQueueTest extends TestCase {
          public void test() throws Exception {
              final var done = new CompletableFuture<>();
              try (
                  final var testingCluster = started(new TestingCluster(1));
                  final var dyingCuratorFramework = getCuratorFramework(testingCluster.getConnectString());
                  final var dyingQueue = newQueue(dyingCuratorFramework, item -> {
                      if (item.equals("0")) {
                          done.complete(null);
                      }
                  })
              ) {
                  dyingQueue.start();
                  testingCluster.killServer(testingCluster.getInstances().iterator().next());
                  Thread.sleep(2 * 60_000);
                  testingCluster.restartServer(testingCluster.getInstances().iterator().next());
                  try (
                      final var aliveCuratorFramework = getCuratorFramework(testingCluster.getConnectString());
                      final var aliveQueue = newQueue(aliveCuratorFramework, __ -> {})
                  ) {
                      aliveQueue.start();
                      aliveQueue.put("0");
                      done.get(1, TimeUnit.MINUTES);
                  }
              }
          }
      
          private static DistributedQueue<String> newQueue(CuratorFramework curatorFramework, Consumer<String> consumer) {
              curatorFramework.start();
              return QueueBuilder.builder(
                  curatorFramework,
                  new QueueConsumer<String>() {
                      @Override
                      public void consumeMessage(String o) {
                          consumer.accept(o);
                      }
      
                      @Override
                      public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                      }
                  },
                  new QueueSerializer<>() {
                      @Override
                      public byte[] serialize(String item) {
                          return item.getBytes();
                      }
      
                      @Override
                      public String deserialize(byte[] bytes) {
                          return new String(bytes);
                      }
                  },
                  "/MyChildrenCacheTest/queue"
              ).buildQueue();
          }
      
          private static TestingCluster started(TestingCluster testingCluster) throws Exception {
              try {
                  testingCluster.start();
                  return testingCluster;
              } catch (Throwable throwable) {
                  try (testingCluster) {
                      throw throwable;
                  }
              }
          }
      
          private static CuratorFramework getCuratorFramework(String connectString) {
              return CuratorFrameworkFactory.builder()
                  .ensembleProvider(new FixedEnsembleProvider(connectString, true))
                  .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                  .build();
          }
      } 

       

      Attachments

        Issue Links

          Activity

            People

              randgalt Jordan Zimmerman
              faucct Никита Соколов
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 2h 10m
                  2h 10m