Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-7498

Inputs SQS with Session based Windowing doesn't work

Details

    • Bug
    • Status: Open
    • P3
    • Resolution: Unresolved
    • 2.12.0
    • None
    • io-java-aws
    • Docker with DirecctRunner
    • Important

    Description

      Hi,

       

      Trying to use Beam with AWS SQS service as an input source, using Session windows.

      The windows aren't executed. Code works well when the input source is Kafka:

      // code placeholder
      SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm");
      
      PipelineOptions options = PipelineOptionsFactory.create();
      AwsOptions awsOptions = options.as(AwsOptions.class);
      BasicAWSCredentials awsCreds = new BasicAWSCredentials("", "");
      awsOptions.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCreds));
      awsOptions.setAwsRegion("eu-west-1");
      
      Pipeline p = Pipeline.create(options);
      // This example reads a public data set consisting of the complete works of Shakespeare.
      p.apply(SqsIO.read().withQueueUrl("https://sqs.eu-west-1.amazonaws.com/XXXXXXXXXXXXXXXX"))
      
      
      /*Per session windows*/
      .apply(ParDo.of(new DoFn<Message, String>() {
      @ProcessElement
      public void processElement(@Element Message element, OutputReceiver<String> out) {
      // Extract the timestamp from log entry we're currently processing.
      
      LOG.info("Message Body: {}", element.getBody());
      out.output(element.getBody());
      }
      }))
      
      //Set windowing configuration
      
      
      
      
      .apply(
      "WindowIntoSessions",
      Window.<String>into(
      Sessions.withGapDuration(Duration.standardSeconds(5)))
      .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
      //.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
      // Late data is dropped
      .accumulatingFiredPanes()
      .withAllowedLateness(Duration.ZERO))
      
      //Extract and count: Extracts a the object to an KV store of <userID, count>
      .apply(
      MapElements.into( TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
      .via(
      (String testO) -> KV.of(testO, new Integer(1))
      )
      )
      
      .apply("CountElements", Sum.integersPerKey())
      
      .apply("Log", ParDo.of(new FilterTextFn()))
      
      .apply(
      MapElements.into(TypeDescriptors.strings())
      .via(
      (KV<String, Integer> wordCount) ->
      wordCount.getKey() + ": " + wordCount.getValue()))
      ;
      
      p.run().waitUntilFinish();
      }
      
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            esteveavi Esteve
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - 12h
                12h
                Remaining:
                Remaining Estimate - 12h
                12h
                Logged:
                Time Spent - Not Specified
                Not Specified