Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
1.16.3
-
None
-
None
-
flink 1.16
centos 7 64
mysql 5.7
paimon 0.5
open jdk 1.8 64
Description
Batch mode, sink to multables (mysql, paimon) the task graph of web page monitor jobs , lack sink task graph block. Expect 2 sink Task graphs . but the results is correct.
flink sql detail see attachements flink-mult-out.sql and screenshot.
flink-mult-out.sql:
======================
SET execution.checkpointing.interval=10000;
SET state.checkpoints.dir=hdfs://hadoop01:9000/flink/checkpoints/20230814103606840;
SET execution.runtime-mode=batch;
CREATE TABLE source_jdbc_9kT_QLyGtM(
`id` BIGINT,
`tenant_code` STRING,
`ces2` STRING,
`ces1` STRING,
`address` STRING,
`amount2` FLOAT,
`bizdate` DATE) WITH (
'connector'='jdbc',
'scan.fetch-size'='300000',
'url'='jdbc:mysql://10.x.x.22:3306/data_storage?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai',
'username'='xxx',
'password'='xxxx',
'table-name'='v_csmx_129_default'
);
CREATE VIEW tranform_sql_mapping AS select `id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from source_jdbc_9kT_QLyGtM where ( `id`<50000 );
CREATE TABLE data_processing_out_1(
`id` BIGINT,
`tenant_code` STRING,
`ces2` STRING,
`ces1` STRING,
`address` STRING,
`amount2` FLOAT,
`bizdate` DATE) WITH (
'connector'='jdbc',
'sink.buffer-flush.max-rows'='50000',
'sink.buffer-flush.interval'='0',
'url'='jdbc:mysql://10.x.x.22:3306/data_storage?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai',
'username'='xxxx',
'password'='xxxxx',
'table-name'='data_processing_out_1'
);
INSERT INTO data_processing_out_1 select `id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from tranform_sql_mapping;
CREATE CATALOG paimon WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://hadoop01:9000/painmon/data-processing/paimon_ods'
);
USE CATALOG paimon;
create database if not exists paimon.paimon_ods_db;
drop table if exists paimon_ods_db.paimon_mysql_test01;
CREATE TABLE if not exists paimon_ods_db.paimon_mysql_test01(
`id` BIGINT,
`tenant_code` STRING,
`ces2` STRING,
`ces1` STRING,
`address` STRING,
`amount2` FLOAT,
`bizdate` DATE
) WITH (
'sink.parallelism'='8',
'bucket'='8',
'bucket-key'='tenant_code',
'sink.use-managed-memory-allocator'='true',
'sink.managed.writer-buffer-memory'='512MB',
'num-sorted-run.compaction-trigger'='20',
'write-buffer-size'='1024MB',
'write-buffer-spillable'='true',
'write-mode'='append-only'
);
INSERT INTO paimon_ods_db.paimon_mysql_test01 select `id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from default_catalog.default_database.tranform_sql_mapping;
==================================================
results
======================
trino> use paimon.paimon_ods_db;
USE
trino:paimon_ods_db> select count from paimon_mysql_test01;
_col0
-------
49999
(1 row)
mysql> SELECT count FROM data_storage.data_processing_out_1;
----------
count |
----------
49999 |
----------
1 row in set (0.06 sec)
mysql>