Details
Description
Hi All,
I have a working Kafka Cluster and Zookeeper Ensemble but after integrating SASL authentication I am facing below exception,
Zookeeper:-
2018-05-23 07:39:59,476 [myid:1] - INFO [ProcessThread(sid:1 cport:-1):: ] - Got user-level KeeperException when processing sessionid:0x301cae0b3480002 type:delete cxid:0x48 zxid:0x20000004e txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
2018-05-23 07:40:39,240 [myid:1] - INFO [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x200b4f13c190006 type:create cxid:0x20 zxid:0x200000052 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers
2018-05-23 07:40:39,240 [myid:1] - INFO [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x200b4f13c190006 type:create cxid:0x21 zxid:0x200000053 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids
2018-05-23 07:41:00,864 [myid:1] - INFO [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x301cae0b3480004 type:create cxid:0x20 zxid:0x200000058 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers
2018-05-23 07:41:00,864 [myid:1] - INFO [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x301cae0b3480004 type:create cxid:0x21 zxid:0x200000059 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids
2018-05-23 07:41:28,456 [myid:1] - INFO [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x200b4f13c190002
2018-05-23 07:41:29,563 [myid:1] - INFO [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x301cae0b3480002
2018-05-23 07:41:29,569 [myid:1] - INFO [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x200b4f13c190006 type:create cxid:0x2d zxid:0x20000005f txntype:-1 reqpath:n/a Error Path:/controller Error:KeeperErrorCode = NodeExists for /controller
2018-05-23 07:41:29,679 [myid:1] - INFO [ProcessThread(sid:1 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x301cae0b3480004 type:delete cxid:0x4e zxid:0x200000061 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
Kafka:-
[2018-05-23 09:06:31,969] ERROR [ReplicaFetcherThread-0-1]: Error for partition [23MAY,0] to broker 1:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
ERROR [ReplicaFetcherThread-0-2]: Current offset 142474 for partition [23MAY,1] out of range; reset offset to 142478 (kafka.server.ReplicaFetcherThread)
ERROR [ReplicaFetcherThread-0-2]: Error for partition [23MAY,2] to broker 2:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
Below are my configuration:-
Zookeeper:-
java.env
SERVER_JVMFLAGS="-Djava.security.auth.login.config=/usr/local/zookeeper/conf/ZK_jaas.conf"
ZK_jaas.conf
Server
;
QuorumServer
;
QuorumLearner
;
zoo.cfg
- The number of milliseconds of each tick
tickTime=2000
- The number of ticks that the initial
- synchronization phase can take
initLimit=10
- The number of ticks that can pass between
- sending a request and getting an acknowledgment
syncLimit=5
- the directory where the snapshot is stored.
- do not use /tmp for storage, /tmp here is just
- example sakes.
#dataDir=/zookeeper/data
dataDir=/zookeeper/zookeeper-3.4.12/data
# dataLogDir ======= >>>>> where you would like ZooKeeper to log
dataLogDir=/zookeeper/zookeeper-3.4.12/data-logs
- the port at which the clients will connect
clientPort=2181
- the maximum number of client connections.
- increase this if you need to handle more clients
maxClientCnxns=60
################################### SASL Auth #####################
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
authProvider.2=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
authProvider.3=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
quorumListenOnAllIPs=true
quorum.auth.enableSasl=true
quorum.auth.learnerRequireSasl=true
quorum.auth.serverRequireSasl=true
quorum.auth.learner.loginContext=QuorumLearner
quorum.auth.server.loginContext=QuorumServer
quorum.cnxn.threads.size=20
- Be sure to read the maintenance section of the
- administrator guide before turning on auto purge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
- The number of snapshots to retain in dataDir
autopurge.snapRetainCount=5
- Purge task interval in hours
- Set to "0" to disable auto purge feature
autopurge.purgeInterval=0
server.1=serverA:2888:3888
server.2=serverB:2888:3888
server.3=serverC:2888:3888
Kafka :-
kafka_jaas.conf
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret"; };
{ org.apache.zookeeper.server.auth.DigestLoginModule required username="admin" password="admin-secret"; }
Client;
server.properties
broker.id=0
delete.topic.enable=true
port=9092
group.id=KAFKA
log.dirs=/kafka/logs01
zookeeper.connect=serverA:2181,serverB:2181,serverC:2181
zookeeper.connection.timeout.ms=6000
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
auto.create.topics.enable=false
listeners=SASL_PLAINTEXT://serverA:9092 ------------------------------> serverB for broker 2 and serverC for broker 3
advertised.listeners=SASL_PLAINTEXT://serverA:9092 --------------- > serverB for broker 2 and serverC for broker 3
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
advertised.host.name=serverA
num.partitions=1
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=30000000
log.flush.interval.ms=1800000
log.retention.minutes=30
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
delete.topic.enable=true
super.users=User:admin
kafka-run-class.sh
added JVM parameter in kafka-run-class.sh - Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_jaas.conf"