Uploaded image for project: 'Ignite'
  1. Ignite
  2. IGNITE-15568

Striped Disruptor doesn't work with JRaft event handlers properly

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 3.0
    • None

    Description

      The following scenario is broken:

      1. Two raft groups are started and mapped to the same stripe.
      2. Two LogEntryAndClosure events are added in quick succession so they form distruptor batch: first for group 1, second for group 2.

      First event is delivered to group 1 with endOfBatch=false, so it's cached in org.apache.ignite.raft.jraft.core.NodeImpl.LogEntryAndClosureHandler#tasks and is not processed.

      Second event is delivered to group 2 with endOfBatch=true and processed, but first event will remain in queue unprocessed forever, because LogEntryAndClosureHandler are different instances per raft group.

      The possible WA for this is to set org.apache.ignite.raft.jraft.option.RaftOptions#applyBatch=1

      Reproducible by org.apache.ignite.internal.table.TxDistributedTest_1_1_1#testCrossTable + applyBatch=32 in ignite-15085 branch

      Implementation notes

      My proposal goes bound Disruptor. The striped disruptor implementation has an interceptor that proposes an event to a specific interceptor. Only the last event in the batch has a completion batch flag. For the other RAFT groups, which has been notified in the striped disruptor, required to create an event to fix a batch into the specific group. The new event will be created in the common striped disruptor interceptor, and it will send to a specific interceptor with flag about batch completion.

      The rule of handling the new event is differenced for various interceptor:

      title=ApplyTaskHandler (FSMCallerImpl#runApplyTask)
      if (maxCommittedIndex >= 0) {
        doCommitted(maxCommittedIndex);
        return -1;
      }
      
      LogEntryAndClosureHandler(LogEntryAndClosureHandler#onEvent)
      if (this.tasks.size() > 0) {
        executeApplyingTasks(this.tasks);
        this.tasks.clear();
      }
      
      ReadIndexEventHandler(ReadIndexEventHandler#onEvent)
      if (this.events.size() > 0) {
        executeReadIndexEvents(this.events);
        this.events.clear();
      }
      
      StableClosureEventHandler(StableClosureEventHandler#onEvent)
      if (this.ab.size > 0) {
        this.lastId = this.ab.flush();
        setDiskId(this.lastId);
      }
      

      Also in bound of this issue, required to rerun benchmarks. Those are expected to dhow increasing in case with high parallelism in one partition.

      There is an example of the benchmark.
       

      Attachments

        1. InsertBenchmark.java
          12 kB
          Vladislav Pyatkov
        2. MyInsertBenchmarkWithMetrics.java
          12 kB
          Vladislav Pyatkov

        Issue Links

          Activity

            People

              v.pyatkov Vladislav Pyatkov
              ascherbakov Alexey Scherbakov
              Alexey Scherbakov Alexey Scherbakov
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 3h 20m
                  3h 20m