Details
-
Improvement
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.19.0
-
None
-
None
Description
We are processing multiple Kinesis streams using pipelines running on DataflowRunner. The time needed to start such pipeline from a pipeline definition (execution of org.apache.beam.sdk.Pipeline.run() method) takes considerable amount of time. In our case:
- a pipeline that consumes data from 196 streams (237 shards in total) starts in 7 minutes
- a pipeline that consumes data from 111 streams (261 shards in total) starts in 4 minutes
I've been investigating this and found out that when Pipeline.run is invoked, the whole pipeline graph is traversed and serialized so it can be passed to the Dataflow backend. Here's part of the stacktrace that shows this traversal:
at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1252) at org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.lambda$getRecords$2(SimplifiedKinesisClient.java:137) at org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.wrapExceptions(SimplifiedKinesisClient.java:210) at org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:134) at org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient.getRecords(SimplifiedKinesisClient.java:119) at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.validateShards(StartingPointShardsFinder.java:195) at org.apache.beam.sdk.io.kinesis.StartingPointShardsFinder.findShardsAtStartingPoint(StartingPointShardsFinder.java:115) at org.apache.beam.sdk.io.kinesis.DynamicCheckpointGenerator.generate(DynamicCheckpointGenerator.java:59) at org.apache.beam.sdk.io.kinesis.KinesisSource.split(KinesisSource.java:88) at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:87) at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51) at org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1630) at org.apache.beam.runners.dataflow.DataflowRunner$StreamingUnboundedRead$ReadWithIdsTranslator.translate(DataflowRunner.java:1627) at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:494) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317) at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251) at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460) at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:433) at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:192) at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795) at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
As you can see, during serialization, org.apache.beam.sdk.io.kinesis.KinesisSource.split method is called. This method finds all shards for the stream and also validates each shard by reading from it. As this process is sequential it takes considerable time that is dependent both on the number of streams (which has the greatest impact) and also the number of shards. Even with a single stream that has large number of shards, the pipeline startup time will be noticeable.
I wonder if it's possible to optimise this somehow?
One way could be to parallelise the whole process, both on the stream and shard level. As this is split between Beam core and KinesisIO this can be complex.
Another solution, that I could think of, is having the information about valid stream shards ready before calling Pipeline.run. It there were a way to create a KinesisIO.Read operation in such a way that it cached shard information and enabled a client code to control the parallelisation of this operation this would allow for a great reduction of the startup time.
I was able to make a PoC to verify how much parallelisation of this process can improve startup time and just by implementing this on the stream level I was able to reduce the startup time from 7 minutes to 2.5 minutes. Unfortunately this was a really hacky solution and I don't consider it to be a one that should be implemented - I hacked the AWS client used by KinesisIO to cache all responses from server and called split method in parallel on all sources before executing Pipeline.run. However this proves that there's a huge room for improvement for pipelines that deal with multiple streams and/or shards.