Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
aws-connector-4.2.0
Description
When trying to create a Kinesis Stream Sink in Table API via a TableDescriptor I get an error:
Caused by: java.lang.UnsupportedOperationException at java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460) at org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.removeMappedOptions(KinesisStreamsConnectorOptionsUtils.java:249) at org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.mapDeprecatedClientOptions(KinesisStreamsConnectorOptionsUtils.java:158) at org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.<init>(KinesisStreamsConnectorOptionsUtils.java:90) at org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:61) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267) ... 20 more
Here is a minimum reproducing example with Flink-1.17.2 and flink-connector-kinesis-4.2.0:
public class Job { public static void main(String[] args) throws Exception { // create data stream environment StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv); Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build(); tEnv.createTemporaryTable( "exampleTable", TableDescriptor.forConnector("datagen").schema(a).build()); TableDescriptor descriptor = TableDescriptor.forConnector("kinesis") .schema(a) .format("json") .option("stream", "abc") .option("aws.region", "eu-central-1") .build(); tEnv.createTemporaryTable("sinkTable", descriptor); tEnv.from("exampleTable").executeInsert("sinkTable"); // error occurs here } }
From my investigation, the error is triggered by the `ResolvedCatalogTable` used when re-mapping the deprecated Kinesis options in `KinesisProducerOptionsMapper`. The `getOptions` method of the table returns an `UnmodifiableMap` which is not mutable.
If the sink table is created via SQL, the error does not occur:
tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString());
because `ResolvedCatalogTable.getOptions` returns a regular `HashMap`.
Attachments
Issue Links
- links to