Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17559

Backpressure seems to be broken when not going through network

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.8.2
    • None
    • Connectors/ RabbitMQ
    • None

    Description

      Back pressure for Flink seems broken. Someone please correct me, from what I understand it only works between network transfers.  If I have a source with no thread sleep then there is no back pressure some operation will accumulate data and crash.  I even tried removing chaining with

      env.disableOperatorChaining()

       and it works with parallelism set to 1, but with 3 or 4 crashes. See below. 

       

      From this I can conclude if I have any map function that produces more output that is coming in it will eventually crash if there is no network dividing them to allow for backpressure. Is this correct?

       

       

      java.lang.OutOfMemoryError: Java heap space
      2020-05-07 18:27:37,942 ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler       - FATAL: Thread 'flink-scheduler-1' produced an uncaught exception. Stopping the process...
      java.lang.OutOfMemoryError: Java heap space
              at akka.dispatch.AbstractNodeQueue.<init>(AbstractNodeQueue.java:32)
              at akka.actor.LightArrayRevolverScheduler$TaskQueue.<init>(LightArrayRevolverScheduler.scala:305)
              at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:270)
              at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
              at java.lang.Thread.run(Thread.java:748)
      2020-05-07 18:27:35,725 ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler       - FATAL: Thread 'flink-metrics-8' produced an uncaught exception. Stopping the process...
      java.lang.OutOfMemoryError: Java heap space
      2020-05-07 18:27:35,725 ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler            - An unexpected connection driver error occured
      java.lang.OutOfMemoryError: Java heap space
              at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:120)
              at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:164)
              at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:580)
              at java.lang.Thread.run(Thread.java:748)
      
      

       

      https://stackoverflow.com/questions/61465789/how-do-i-create-a-flink-richparallelsourcefunction-with-backpressure

       

       

       It seems that I am suppose guess how much my sink can handle and throttle to that amount in my source generator. But that always puts my system of a risk of crashing. 

       

       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              Munoz Luis
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: