Details
-
Bug
-
Status: Resolved
-
P1
-
Resolution: Fixed
-
2.5.0
-
None
Description
I am trying to write data to kinesis using apache beam kinesis IO. But I am having some issues.
PS: I am using aws sts.
The console output shows....
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.producer.IKinesisProducer.addUserRecord(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/nio/ByteBuffer;)Lorg/apache/beam/repackaged/beam_sdks_java_io_kinesis/com/google/common/util/concurrent/ListenableFuture; at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) at com.nestaway.beam_demo.KinesisSql.main(KinesisSql.java:153) Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.producer.IKinesisProducer.addUserRecord(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/nio/ByteBuffer;)Lorg/apache/beam/repackaged/beam_sdks_java_io_kinesis/com/google/common/util/concurrent/ListenableFuture; at org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.processElement(KinesisIO.java:568)
Code.......
data is a Pcollection in byte[ ] format.
data.apply(KinesisIO.write() .withStreamName("stagWatchBallEventStream") .withPartitionKey("a") .withAWSClientsProvider(new CustomKinesisClientProvider()));
Custom Kinesis Client :
public class CustomKinesisClientProvider implements AWSClientsProvider { private static final long serialVersionUID = 1L; private static String ID = "XXXXX"; private static String SECRET = "XXXXX"; private static String TOKEN = "XXXXX"; private static BasicSessionCredentials sessionCredentials = new BasicSessionCredentials( ID, SECRET, TOKEN); private static KinesisProducerConfiguration config = new KinesisProducerConfiguration() .setRecordMaxBufferedTime(3000) .setMaxConnections(1) .setRequestTimeout(60000) .setRegion("us-west-2") .setCredentialsProvider(new AWSStaticCredentialsProvider(sessionCredentials)); @Override public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration config) { return new KinesisProducer(config); } }
Attachments
Attachments
Issue Links
- links to