Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25846

[FLIP-171] Async Sink does not gracefully shutdown on Cancel

    XMLWordPrintableJSON

Details

    Description

      Description

      Async Sink does not react gracefully to cancellation signal

      Reproduction Steps

      • Start a job using an Async Sink implementation, for example KDS
      • Navigate to Flink Dashboard
      • Click Job > Cancel

      Actual Results

      • Sink operator stuck in Cancelling, retrying
      2022-01-27 08:33:40,301 WARN  org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkWriter [] - KDS Sink failed to persist 5 entries to KDS
      java.util.concurrent.CompletionException: java.lang.IllegalStateException: Interrupted waiting to refresh the value.
      	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331) ~[?:?]
      	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346) ~[?:?]
      	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:870) ~[?:?]
      	at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883) ~[?:?]
      	at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251) ~[?:?]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.putRecords(DefaultKinesisAsyncClient.java:2112) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkWriter.submitRequestEntries(KinesisDataStreamsSinkWriter.java:122) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:311) ~[flink-connector-files-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.prepareCommit(AsyncSinkWriter.java:391) ~[flink-connector-files-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.endInput(SinkOperator.java:192) ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96) ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97) ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68) ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:517) ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802) ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751) ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) [flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) [flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) [flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) [flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
      	at java.lang.Thread.run(Thread.java:829) [?:?]
      Caused by: java.lang.IllegalStateException: Interrupted waiting to refresh the value.
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.handleInterruptedException(CachedSupplier.java:146) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:140) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:89) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:91) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider.resolveCredentials(StsAssumeRoleCredentialsProvider.java:41) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.resolveCredentials(AwsExecutionContextBuilder.java:165) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.invokeInterceptorsAndCreateExecutionContext(AwsExecutionContextBuilder.java:102) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler.invokeInterceptorsAndCreateExecutionContext(AwsAsyncClientHandler.java:65) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.lambda$execute$1(BaseAsyncClientHandler.java:77) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.measureApiCallSuccess(BaseAsyncClientHandler.java:282) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.execute(BaseAsyncClientHandler.java:75) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler.execute(AwsAsyncClientHandler.java:52) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.putRecords(DefaultKinesisAsyncClient.java:2107) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	... 16 more
      Caused by: java.lang.InterruptedException
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireNanos(AbstractQueuedSynchronizer.java:1286) ~[?:?]
      	at java.util.concurrent.locks.ReentrantLock.tryLock(ReentrantLock.java:424) ~[?:?]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:126) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:89) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:91) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider.resolveCredentials(StsAssumeRoleCredentialsProvider.java:41) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.resolveCredentials(AwsExecutionContextBuilder.java:165) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.invokeInterceptorsAndCreateExecutionContext(AwsExecutionContextBuilder.java:102) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler.invokeInterceptorsAndCreateExecutionContext(AwsAsyncClientHandler.java:65) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.lambda$execute$1(BaseAsyncClientHandler.java:77) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.measureApiCallSuccess(BaseAsyncClientHandler.java:282) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.execute(BaseAsyncClientHandler.java:75) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler.execute(AwsAsyncClientHandler.java:52) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.putRecords(DefaultKinesisAsyncClient.java:2107) ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
      	... 16 more
      

      Expected Results

      • Sink operator closes

      Suggested Resolution

      • Async Sink should treat `InterruptedException` as stop signal

      Attachments

        Issue Links

          Activity

            People

              CrynetLogistics Zichen Liu
              dannycranmer Danny Cranmer
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: