Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-3104

Hudi-kafka-connect can not scan hadoop config files by HADOOP_CONF_DIR

    XMLWordPrintableJSON

Details

    Description

      I used hudi-kafka-connect to test pull kafka topic datas to hudi. I've build a kafka connect docker by this dockerfile:

      FROM confluentinc/cp-kafka-connect:6.1.1
      RUN confluent-hub install --no-prompt confluentinc/kafka-connect-hdfs:10.1.3
      COPY hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar /usr/share/confluent-hub-components/confluentinc-kafka-connect-hdfs/lib
      

      When I started this docker container and submit a task, hudi report this error:

      [2021-12-27 15:04:55,214] INFO Setting record key volume and partition fields date for table hdfs://hdp-syzh-cluster/hive/warehouse/default.db/hudi-test-topichudi-test-topic (org.apache.hudi.connect.writers.KafkaConnectTransactionServices)
      [2021-12-27 15:04:55,224] INFO Initializing hdfs://hdp-syzh-cluster/hive/warehouse/default.db/hudi-test-topic as hoodie table hdfs://hdp-syzh-cluster/hive/warehouse/default.db/hudi-test-topic (org.apache.hudi.common.table.HoodieTableMetaClient)
      WARNING: An illegal reflective access operation has occurred
      WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-hdfs/lib/hadoop-auth-2.10.1.jar) to method sun.security.krb5.Config.getInstance()
      WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
      WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
      WARNING: All illegal access operations will be denied in a future release
      [2021-12-27 15:04:55,571] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader)
      [2021-12-27 15:04:56,154] ERROR Fatal error initializing task null for partition 0 (org.apache.hudi.connect.HoodieSinkTask)
      org.apache.hudi.exception.HoodieException: Fatal error instantiating Hudi Transaction Services 
      	at org.apache.hudi.connect.writers.KafkaConnectTransactionServices.<init>(KafkaConnectTransactionServices.java:113) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
      	at org.apache.hudi.connect.transaction.ConnectTransactionCoordinator.<init>(ConnectTransactionCoordinator.java:88) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
      	at org.apache.hudi.connect.HoodieSinkTask.bootstrap(HoodieSinkTask.java:191) [hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
      	at org.apache.hudi.connect.HoodieSinkTask.open(HoodieSinkTask.java:151) [hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:640) [connect-runtime-6.1.1-ccs.jar:?]
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:71) [connect-runtime-6.1.1-ccs.jar:?]
      	at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:705) [connect-runtime-6.1.1-ccs.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293) [kafka-clients-6.1.1-ccs.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430) [kafka-clients-6.1.1-ccs.jar:?]
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449) [kafka-clients-6.1.1-ccs.jar:?]
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365) [kafka-clients-6.1.1-ccs.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508) [kafka-clients-6.1.1-ccs.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257) [kafka-clients-6.1.1-ccs.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226) [kafka-clients-6.1.1-ccs.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) [kafka-clients-6.1.1-ccs.jar:?]
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:457) [connect-runtime-6.1.1-ccs.jar:?]
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324) [connect-runtime-6.1.1-ccs.jar:?]
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) [connect-runtime-6.1.1-ccs.jar:?]
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) [connect-runtime-6.1.1-ccs.jar:?]
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) [connect-runtime-6.1.1-ccs.jar:?]
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238) [connect-runtime-6.1.1-ccs.jar:?]
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
      	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
      	at java.lang.Thread.run(Thread.java:829) [?:?]
      Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: hdp-syzh-cluster
      	at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:443) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:142) ~[hadoop-hdfs-client-2.10.1.jar:?]
      	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:369) ~[hadoop-hdfs-client-2.10.1.jar:?]
      	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:303) ~[hadoop-hdfs-client-2.10.1.jar:?]
      	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159) ~[hadoop-hdfs-client-2.10.1.jar:?]
      	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3247) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:121) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3296) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3264) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:475) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:102) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
      	at org.apache.hudi.common.table.HoodieTableMetaClient.initTableAndGetMetaClient(HoodieTableMetaClient.java:350) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
      	at org.apache.hudi.common.table.HoodieTableMetaClient$PropertyBuilder.initTable(HoodieTableMetaClient.java:897) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
      	at org.apache.hudi.connect.writers.KafkaConnectTransactionServices.<init>(KafkaConnectTransactionServices.java:109) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
      	... 25 more
      Caused by: java.net.UnknownHostException: hdp-syzh-cluster
      	at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:443) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:142) ~[hadoop-hdfs-client-2.10.1.jar:?]
      	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:369) ~[hadoop-hdfs-client-2.10.1.jar:?]
      	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:303) ~[hadoop-hdfs-client-2.10.1.jar:?]
      	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159) ~[hadoop-hdfs-client-2.10.1.jar:?]
      	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3247) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:121) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3296) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3264) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:475) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356) ~[hadoop-common-2.10.1.jar:?]
      	at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:102) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
      	at org.apache.hudi.common.table.HoodieTableMetaClient.initTableAndGetMetaClient(HoodieTableMetaClient.java:350) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
      	at org.apache.hudi.common.table.HoodieTableMetaClient$PropertyBuilder.initTable(HoodieTableMetaClient.java:897) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
      	at org.apache.hudi.connect.writers.KafkaConnectTransactionServices.<init>(KafkaConnectTransactionServices.java:109) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT]
      	... 25 more
      [2021-12-27 15:05:51,434] ERROR WorkerSinkTask{id=hudi-sink-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask)
      org.apache.kafka.connect.errors.RetriableException: TransactionParticipant should be created for each assigned partition, but has not been created for the topic/partition: hudi-test-topic:0
      	at org.apache.hudi.connect.HoodieSinkTask.put(HoodieSinkTask.java:111)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
      	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      
      

      I set HADOOP_CONF_DIR and it didn't work.
      So I checked codes and found that hudi use the default initialization Configuration method when initializing HDFS configuration. If I specify HADOOP_CONF_DIR, it doesn't work.

      Because Kafka does not have a default HDFS environment, we need the ability to specify HADOOP_HOME or HADOOP_CONF_DIR or in kafka-connect

      Attachments

        Issue Links

          Activity

            People

              chenxiang cdmikechen
              chenxiang cdmikechen
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: