Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34044

Kinesis Sink Cannot be Created via TableDescriptor

    XMLWordPrintableJSON

Details

    Description

      When trying to create a Kinesis Stream Sink in Table API via a TableDescriptor I get an error:

      Caused by: java.lang.UnsupportedOperationException
          at java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460)
          at org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.removeMappedOptions(KinesisStreamsConnectorOptionsUtils.java:249)
          at org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.mapDeprecatedClientOptions(KinesisStreamsConnectorOptionsUtils.java:158)
          at org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.<init>(KinesisStreamsConnectorOptionsUtils.java:90)
          at org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:61)
          at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267)
          ... 20 more
      

      Here is a minimum reproducing example with Flink-1.17.2 and flink-connector-kinesis-4.2.0:

      public class Job {
        public static void main(String[] args) throws Exception {
          // create data stream environment
          StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
          sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
          StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
      
          Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build();
          tEnv.createTemporaryTable(
              "exampleTable", TableDescriptor.forConnector("datagen").schema(a).build());
          TableDescriptor descriptor =
              TableDescriptor.forConnector("kinesis")
                  .schema(a)
                  .format("json")
                  .option("stream", "abc")
                  .option("aws.region", "eu-central-1")
                  .build();
          tEnv.createTemporaryTable("sinkTable", descriptor);
      
          tEnv.from("exampleTable").executeInsert("sinkTable"); // error occurs here
        }
      } 

      From my investigation, the error is triggered by the `ResolvedCatalogTable` used when re-mapping the deprecated Kinesis options in `KinesisProducerOptionsMapper`. The `getOptions` method of the table returns an `UnmodifiableMap` which is not mutable.

      If the sink table is created via SQL, the error does not occur:

      tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString());
      

      because `ResolvedCatalogTable.getOptions` returns a regular `HashMap`.

      Attachments

        Issue Links

          Activity

            People

              chalixar Ahmed Hamdy
              tilman151 Tilman Krokotsch
              Votes:
              1 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: