Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32861

Batch mode, sink to multables the task graph of web page monitor jobs , lack sink task graph block

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 1.16.3
    • None
    • Project Website
    • None
    • flink 1.16

      centos 7 64

      mysql 5.7

      paimon 0.5

      open jdk 1.8 64

    Description

      Batch mode, sink to multables (mysql, paimon) the task graph of web page monitor jobs , lack sink task graph block. Expect 2 sink Task graphs . but the results is correct.

      flink sql detail see attachements flink-mult-out.sql and screenshot.

      flink-mult-out.sql:

      ======================

      SET execution.checkpointing.interval=10000;
      SET state.checkpoints.dir=hdfs://hadoop01:9000/flink/checkpoints/20230814103606840;
      SET execution.runtime-mode=batch;
      CREATE TABLE source_jdbc_9kT_QLyGtM(
      `id` BIGINT,
      `tenant_code` STRING,
      `ces2` STRING,
      `ces1` STRING,
      `address` STRING,
      `amount2` FLOAT,
      `bizdate` DATE)  WITH ( 
          'connector'='jdbc',
          'scan.fetch-size'='300000',
          'url'='jdbc:mysql://10.x.x.22:3306/data_storage?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai',
          'username'='xxx',
          'password'='xxxx',
          'table-name'='v_csmx_129_default'
      );

       

      CREATE VIEW tranform_sql_mapping AS select `id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from source_jdbc_9kT_QLyGtM where  ( `id`<50000 );

      CREATE TABLE data_processing_out_1(
      `id` BIGINT,
      `tenant_code` STRING,
      `ces2` STRING,
      `ces1` STRING,
      `address` STRING,
      `amount2` FLOAT,
      `bizdate` DATE)  WITH ( 
          'connector'='jdbc',
          'sink.buffer-flush.max-rows'='50000',
          'sink.buffer-flush.interval'='0',
          'url'='jdbc:mysql://10.x.x.22:3306/data_storage?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai',
          'username'='xxxx',
          'password'='xxxxx',
          'table-name'='data_processing_out_1'
      );

      INSERT INTO data_processing_out_1 select `id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from tranform_sql_mapping;

      CREATE CATALOG paimon WITH (
          'type' = 'paimon',
          'warehouse' = 'hdfs://hadoop01:9000/painmon/data-processing/paimon_ods'   
      );

      USE CATALOG paimon;
      create database if not exists paimon.paimon_ods_db;
      drop table if exists paimon_ods_db.paimon_mysql_test01;
      CREATE TABLE if not exists paimon_ods_db.paimon_mysql_test01(
      `id` BIGINT,
      `tenant_code` STRING,
      `ces2` STRING,
      `ces1` STRING,
      `address` STRING,
      `amount2` FLOAT,
      `bizdate` DATE
      )  WITH (
         'sink.parallelism'='8',
         'bucket'='8',
         'bucket-key'='tenant_code',
         'sink.use-managed-memory-allocator'='true',
         'sink.managed.writer-buffer-memory'='512MB',
         'num-sorted-run.compaction-trigger'='20',
         'write-buffer-size'='1024MB',
         'write-buffer-spillable'='true',
         'write-mode'='append-only'
      );

      INSERT INTO paimon_ods_db.paimon_mysql_test01 select `id`,`tenant_code`,`ces2`,`ces1`,`address`,`amount2`,`bizdate` from default_catalog.default_database.tranform_sql_mapping;

       

      ==================================================

      results

      ======================

      trino> use paimon.paimon_ods_db;
      USE
      trino:paimon_ods_db> select count from paimon_mysql_test01;
       _col0 
      -------
       49999 
      (1 row)

       

      mysql> SELECT count FROM data_storage.data_processing_out_1;
      ----------

      count

      ----------

         49999

      ----------
      1 row in set (0.06 sec)

      mysql> 

      Attachments

        1. screenshot.png
          53 kB
          zhengyuan
        2. flink-mult-out.sql
          2 kB
          zhengyuan

        Activity

          People

            Unassigned Unassigned
            zhengyuan zhengyuan
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: