Details
-
Bug
-
Status: In Progress
-
Major
-
Resolution: Unresolved
-
1.17.2
-
None
-
None
Description
We have a use case where we convert from the Table API to a Data Stream with a class, perform some operations, and then convert back to the Table API. When the data contains a Map, the conversion back to the Table API converts the Map to RAW('java.util.Map', '...'). This causes an 'Incompatible types for sink column' exception.
In this particular case, the Map contains the Kafka headers, which we need to preserve and write to the output topic. Both topics/table definitions use the same schema. We have set a DataTypeHint annotation on the Map field in the Java class. We are currently working around this issue by using a UDF to simply perform a type conversion from the RAW Java Map to the Table API Map.
One note is that if no operations are performed on the stream, it work's correctly. But adding a simple identity map causes the exception.
Here's a simple example to reproduce the problem.
CREATE TABLE Source (
id STRING,
headers MAP<STRING, BYTES> METADATA
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = 'kafka-bootstrap-server',
'format' = 'json'
);
CREATE TABLE Target (
id STRING,
headers MAP<STRING, BYTES> METADATA
) WITH (
'connector' = 'kafka',
'topic' = 'target',
'properties.bootstrap.servers' = 'kafka-bootstrap-server',
'format' = 'json'
);
public class MyRecord
{ private String id; @DataTypeHint(value = "MAP<STRING, BYTES>") private Map<String,byte[]> headers; ... }public class MyJob {
public static void main(String[] args) throws Exception
}