Uploaded image for project: 'Beam'
  1. Beam
  2. BEAM-5060

Issues with aws KPL while writing to kinesis using beam

Details

    • Bug
    • Status: Resolved
    • P1
    • Resolution: Fixed
    • 2.5.0
    • 2.7.0
    • io-java-kinesis
    • None

    Description

      I am trying to write data to kinesis using apache beam kinesis IO. But I am having some issues.

      PS: I am using aws sts.

       

      The console output shows....

       

      Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.producer.IKinesisProducer.addUserRecord(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/nio/ByteBuffer;)Lorg/apache/beam/repackaged/beam_sdks_java_io_kinesis/com/google/common/util/concurrent/ListenableFuture;
      
      at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349)
      
      at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319)
      
      at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210)
      
      at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
      
      at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
      
      at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
      
      at com.nestaway.beam_demo.KinesisSql.main(KinesisSql.java:153)
      
      Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.producer.IKinesisProducer.addUserRecord(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/nio/ByteBuffer;)Lorg/apache/beam/repackaged/beam_sdks_java_io_kinesis/com/google/common/util/concurrent/ListenableFuture;
      
      at org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.processElement(KinesisIO.java:568)
      
      
      

       

       

      Code.......

      data is a Pcollection in byte[ ] format.

       

      data.apply(KinesisIO.write()
          .withStreamName("stagWatchBallEventStream")
          .withPartitionKey("a")
          .withAWSClientsProvider(new CustomKinesisClientProvider()));
      

       

       Custom Kinesis Client :

      public class CustomKinesisClientProvider implements AWSClientsProvider {
      private static final long serialVersionUID = 1L;
      
      private static String ID = "XXXXX";
      
      private static String SECRET = "XXXXX";
      
      private static String TOKEN = "XXXXX";
      
      private static BasicSessionCredentials sessionCredentials = new BasicSessionCredentials(
      
        ID,
      
        SECRET,
      
        TOKEN);
      
      
      
      private static KinesisProducerConfiguration config = new KinesisProducerConfiguration()
      
                 .setRecordMaxBufferedTime(3000)
      
                 .setMaxConnections(1)
      
                 .setRequestTimeout(60000)
      
                 .setRegion("us-west-2")
      
                 .setCredentialsProvider(new AWSStaticCredentialsProvider(sessionCredentials));
      
      @Override
      public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration config) {
      return new KinesisProducer(config);
      }
      
      }

       

       

      Attachments

        1. pom.xml
          4 kB
          Varsha Thanooj

        Issue Links

          Activity

            People

              aromanenko Alexey Romanenko
              bvt279 Varsha Thanooj
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 0.5h
                  0.5h