Details
-
Task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
Description
I used hudi-kafka-connect to test pull kafka topic datas to hudi. I've build a kafka connect docker by this dockerfile:
FROM confluentinc/cp-kafka-connect:6.1.1 RUN confluent-hub install --no-prompt confluentinc/kafka-connect-hdfs:10.1.3 COPY hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar /usr/share/confluent-hub-components/confluentinc-kafka-connect-hdfs/lib
When I started this docker container and submit a task, hudi report this error:
[2021-12-27 15:04:55,214] INFO Setting record key volume and partition fields date for table hdfs://hdp-syzh-cluster/hive/warehouse/default.db/hudi-test-topichudi-test-topic (org.apache.hudi.connect.writers.KafkaConnectTransactionServices) [2021-12-27 15:04:55,224] INFO Initializing hdfs://hdp-syzh-cluster/hive/warehouse/default.db/hudi-test-topic as hoodie table hdfs://hdp-syzh-cluster/hive/warehouse/default.db/hudi-test-topic (org.apache.hudi.common.table.HoodieTableMetaClient) WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-hdfs/lib/hadoop-auth-2.10.1.jar) to method sun.security.krb5.Config.getInstance() WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release [2021-12-27 15:04:55,571] WARN Unable to load native-hadoop library for your platform... using builtin-java classes where applicable (org.apache.hadoop.util.NativeCodeLoader) [2021-12-27 15:04:56,154] ERROR Fatal error initializing task null for partition 0 (org.apache.hudi.connect.HoodieSinkTask) org.apache.hudi.exception.HoodieException: Fatal error instantiating Hudi Transaction Services at org.apache.hudi.connect.writers.KafkaConnectTransactionServices.<init>(KafkaConnectTransactionServices.java:113) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.connect.transaction.ConnectTransactionCoordinator.<init>(ConnectTransactionCoordinator.java:88) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.connect.HoodieSinkTask.bootstrap(HoodieSinkTask.java:191) [hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.connect.HoodieSinkTask.open(HoodieSinkTask.java:151) [hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:640) [connect-runtime-6.1.1-ccs.jar:?] at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:71) [connect-runtime-6.1.1-ccs.jar:?] at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:705) [connect-runtime-6.1.1-ccs.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293) [kafka-clients-6.1.1-ccs.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430) [kafka-clients-6.1.1-ccs.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449) [kafka-clients-6.1.1-ccs.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365) [kafka-clients-6.1.1-ccs.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508) [kafka-clients-6.1.1-ccs.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1257) [kafka-clients-6.1.1-ccs.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1226) [kafka-clients-6.1.1-ccs.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) [kafka-clients-6.1.1-ccs.jar:?] at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:457) [connect-runtime-6.1.1-ccs.jar:?] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324) [connect-runtime-6.1.1-ccs.jar:?] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) [connect-runtime-6.1.1-ccs.jar:?] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) [connect-runtime-6.1.1-ccs.jar:?] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) [connect-runtime-6.1.1-ccs.jar:?] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238) [connect-runtime-6.1.1-ccs.jar:?] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: hdp-syzh-cluster at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:443) ~[hadoop-common-2.10.1.jar:?] at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:142) ~[hadoop-hdfs-client-2.10.1.jar:?] at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:369) ~[hadoop-hdfs-client-2.10.1.jar:?] at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:303) ~[hadoop-hdfs-client-2.10.1.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159) ~[hadoop-hdfs-client-2.10.1.jar:?] at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3247) ~[hadoop-common-2.10.1.jar:?] at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:121) ~[hadoop-common-2.10.1.jar:?] at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3296) ~[hadoop-common-2.10.1.jar:?] at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3264) ~[hadoop-common-2.10.1.jar:?] at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:475) ~[hadoop-common-2.10.1.jar:?] at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356) ~[hadoop-common-2.10.1.jar:?] at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:102) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.common.table.HoodieTableMetaClient.initTableAndGetMetaClient(HoodieTableMetaClient.java:350) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.common.table.HoodieTableMetaClient$PropertyBuilder.initTable(HoodieTableMetaClient.java:897) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.connect.writers.KafkaConnectTransactionServices.<init>(KafkaConnectTransactionServices.java:109) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] ... 25 more Caused by: java.net.UnknownHostException: hdp-syzh-cluster at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:443) ~[hadoop-common-2.10.1.jar:?] at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:142) ~[hadoop-hdfs-client-2.10.1.jar:?] at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:369) ~[hadoop-hdfs-client-2.10.1.jar:?] at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:303) ~[hadoop-hdfs-client-2.10.1.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:159) ~[hadoop-hdfs-client-2.10.1.jar:?] at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3247) ~[hadoop-common-2.10.1.jar:?] at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:121) ~[hadoop-common-2.10.1.jar:?] at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3296) ~[hadoop-common-2.10.1.jar:?] at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3264) ~[hadoop-common-2.10.1.jar:?] at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:475) ~[hadoop-common-2.10.1.jar:?] at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356) ~[hadoop-common-2.10.1.jar:?] at org.apache.hudi.common.fs.FSUtils.getFs(FSUtils.java:102) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.common.table.HoodieTableMetaClient.initTableAndGetMetaClient(HoodieTableMetaClient.java:350) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.common.table.HoodieTableMetaClient$PropertyBuilder.initTable(HoodieTableMetaClient.java:897) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] at org.apache.hudi.connect.writers.KafkaConnectTransactionServices.<init>(KafkaConnectTransactionServices.java:109) ~[hudi-kafka-connect-bundle-0.11.0-SNAPSHOT.jar:0.11.0-SNAPSHOT] ... 25 more [2021-12-27 15:05:51,434] ERROR WorkerSinkTask{id=hudi-sink-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask) org.apache.kafka.connect.errors.RetriableException: TransactionParticipant should be created for each assigned partition, but has not been created for the topic/partition: hudi-test-topic:0 at org.apache.hudi.connect.HoodieSinkTask.put(HoodieSinkTask.java:111) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
I set HADOOP_CONF_DIR and it didn't work.
So I checked codes and found that hudi use the default initialization Configuration method when initializing HDFS configuration. If I specify HADOOP_CONF_DIR, it doesn't work.
Because Kafka does not have a default HDFS environment, we need the ability to specify HADOOP_HOME or HADOOP_CONF_DIR or in kafka-connect
Attachments
Issue Links
- links to