Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-16829

camel-core - Stuck processing with nested parallel splits and custom thread pool

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 3.11.0
    • 4.7.0
    • camel-core
    • None
    • Advanced

    Description

      The cause of this issue is difficult to pinpoint, but running the following code, the route will be stuck indefinitely in processing:

      @Component
      public static class Route extends RouteBuilder {
      	@Override
      	public void configure() throws Exception {
      		final var executorService = new ThreadPoolBuilder(getContext())
      				.poolSize(1)
      				.maxPoolSize(1)
      				.maxQueueSize(0)
      				.build("inner");
      
      		from("direct:main")
      				.split(body()).parallelProcessing()
      				.to("direct:inner?synchronous=true");
      
      		from("direct:inner")
      				.split(body()).executorService(executorService)
      				.log("${body}");
      	}
      }
      
      @Component
      public static class Runner implements CommandLineRunner {
      
      	@Autowired
      	private CamelContext camelContext;
      
      	@Autowired
      	private ProducerTemplate producerTemplate;
      
      	@Override
      	public void run(String... args) {
      		final var exchange = new DefaultExchange(camelContext);
      		exchange.getIn().setBody(List.of(List.of("0-0", "0-1"), List.of("1-0", "1-1")));
      		producerTemplate.send("direct:main", exchange);
      	}
      }
      

      There are two important parts to reproduce the issue:

      • The forward to direct:inner with synchronous=true. This will cause an await here and it is the point where the route processing is stuck.
      • The inner executor, with a small pool and no queue, which will trigger the default CallerRuns rejection policy and run the split in the original thread instead of as new one.

      The cause of the stuck await seems to be linked to the way the AsyncProcessorAwaitManager and DefaultReactiveExecutor.executeFromQueue() interacts. Here the await callback to decrement the latch have been pushed in a back queue of the reactive worker (probably by a scheduleMain in the middle), but the executeFromQueue does not process the worker back queues so the callback is not executed and we are stuck on the await.

      I'm not sure what the solution is here, maybe the executeFromQueue should process back queues, or the CallerRuns rejection policy is just not supported in the first place but it should probably not be the default then.

      Attachments

        Issue Links

          Activity

            People

              davsclaus Claus Ibsen
              spadou Samuel Padou
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: