Details
-
Bug
-
Status: Open
-
P3
-
Resolution: Unresolved
-
2.12.0
-
None
-
Docker with DirecctRunner
-
Important
Description
Hi,
Trying to use Beam with AWS SQS service as an input source, using Session windows.
The windows aren't executed. Code works well when the input source is Kafka:
// code placeholder SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm"); PipelineOptions options = PipelineOptionsFactory.create(); AwsOptions awsOptions = options.as(AwsOptions.class); BasicAWSCredentials awsCreds = new BasicAWSCredentials("", ""); awsOptions.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCreds)); awsOptions.setAwsRegion("eu-west-1"); Pipeline p = Pipeline.create(options); // This example reads a public data set consisting of the complete works of Shakespeare. p.apply(SqsIO.read().withQueueUrl("https://sqs.eu-west-1.amazonaws.com/XXXXXXXXXXXXXXXX")) /*Per session windows*/ .apply(ParDo.of(new DoFn<Message, String>() { @ProcessElement public void processElement(@Element Message element, OutputReceiver<String> out) { // Extract the timestamp from log entry we're currently processing. LOG.info("Message Body: {}", element.getBody()); out.output(element.getBody()); } })) //Set windowing configuration .apply( "WindowIntoSessions", Window.<String>into( Sessions.withGapDuration(Duration.standardSeconds(5))) .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW) //.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) // Late data is dropped .accumulatingFiredPanes() .withAllowedLateness(Duration.ZERO)) //Extract and count: Extracts a the object to an KV store of <userID, count> .apply( MapElements.into( TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())) .via( (String testO) -> KV.of(testO, new Integer(1)) ) ) .apply("CountElements", Sum.integersPerKey()) .apply("Log", ParDo.of(new FilterTextFn())) .apply( MapElements.into(TypeDescriptors.strings()) .via( (KV<String, Integer> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) ; p.run().waitUntilFinish(); }