Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-10387

Cannot include SMT configs with source connector that uses topic.creation.* properties

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.6.0
    • 2.7.0, 2.6.1
    • connect
    • None

    Description

      Let's say we try to create a connector with the following config:

      {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "localhost",
        "database.port": 32843,
        "database.user": "test",
        "database.password": "test",
        "database.dbname": "test",
        "database.server.name": "tcpsql",
        "table.whitelist": "public.numerics",
        "transforms": "abc",
        "transforms.abc.type": "io.debezium.transforms.ExtractNewRecordState",
        "topic.creation.default.partitions": "1",
        "topic.creation.default.replication.factor": "1"
      }
      

      this fails with the following error in the Connector worker:

      [2020-08-11 02:47:05,908] ERROR Failed to start task deb-0 (org.apache.kafka.connect.runtime.Worker:560)
      org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.config.ConfigException: Unknown configuration 'transforms.abc.type'
      	at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:296)
      	at org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:605)
      	at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:555)
      	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1251)
      	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:127)
      	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1266)
      	at org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1262)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
      	at java.base/java.lang.Thread.run(Thread.java:832)
      Caused by: org.apache.kafka.common.config.ConfigException: Unknown configuration 'transforms.abc.type'
      	at org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:159)
      	at org.apache.kafka.connect.runtime.SourceConnectorConfig$EnrichedSourceConnectorConfig.get(SourceConnectorConfig.java:57)
      	at org.apache.kafka.connect.runtime.SourceConnectorConfig.get(SourceConnectorConfig.java:141)
      	at org.apache.kafka.common.config.AbstractConfig.getClass(AbstractConfig.java:216)
      	at org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:281)
      	... 10 more
      

      connector creation works fine, if we remove the topic.creation properties above.

      Not entirely sure but it looks like the piece of code that might need a fix is here (as it does not add transforms.* configs into the returned ConfigDef instances: https://github.com/apache/kafka/blob/2.6.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java#L94

      Attachments

        Issue Links

          Activity

            People

              kkonstantine Konstantine Karantasis
              wicknicks Arjun Satish
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: