Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35726

Data Stream to Table API converts Map to RAW 'java.util.Map'

    XMLWordPrintableJSON

Details

    • Bug
    • Status: In Progress
    • Major
    • Resolution: Unresolved
    • 1.17.2
    • None
    • Table SQL / Runtime
    • None

    Description

      We have a use case where we convert from the Table API to a Data Stream with a class, perform some operations, and then convert back to the Table API. When the data contains a Map, the conversion back to the Table API converts the Map to RAW('java.util.Map', '...'). This causes an 'Incompatible types for sink column' exception.

      In this particular case, the Map contains the Kafka headers, which we need to preserve and write to the output topic. Both topics/table definitions use the same schema. We have set a DataTypeHint annotation on the Map field in the Java class. We are currently working around this issue by using a UDF to simply perform a type conversion from the RAW Java Map to the Table API Map.

      One note is that if no operations are performed on the stream, it work's correctly. But adding a simple identity map causes the exception.

      Here's a simple example to reproduce the problem.

      CREATE TABLE Source (
      id STRING,
      headers MAP<STRING, BYTES> METADATA
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'source',
      'properties.bootstrap.servers' = 'kafka-bootstrap-server',
      'format' = 'json'
      );

      CREATE TABLE Target (
      id STRING,
      headers MAP<STRING, BYTES> METADATA
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'target',
      'properties.bootstrap.servers' = 'kafka-bootstrap-server',
      'format' = 'json'
      );

      public class MyRecord

      { private String id; @DataTypeHint(value = "MAP<STRING, BYTES>") private Map<String,byte[]> headers; ... }

      public class MyJob {
      public static void main(String[] args) throws Exception

      { final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv); Table sourceTable = tableEnv.from("Source"); var sourceStream = tableEnv.toDataStream(sourceTable, MyRecord.class); var mappedStream = sourceStream.map(row -> row); Table outputTable = tableEnv.fromDataStream(mappedStream); tableEnv.createStatementSet().add(outputTable.insertInto("Target")) .attachAsDataStream(); streamEnv.executeAsync("Table Datastream test"); }

      }

      Attachments

        Activity

          People

            sverma Santwana Verma
            david.perkins David Perkins
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: