Description
I am developing a Kafka client that uses OAUTHBEARER and SSL to connect. I'm attempting to test against a server using a key from a custom CA. I added the trust-chain for the server to a Truststore JKS file, and referenced it in the configuration. However, I continually get PKIX errors. After some code tracing, I believe the OAUTHBEARER client code ignores defined truststores.
Here is an example based on my configuration:
application.id=my-kafka-client client.id=my-kafka-client group.id=my-kafka-client # OAuth/SSL listener bootstrap.servers=<MY_SERVER>:9096 security.protocol=SASL_SSL # OAuth Configuration sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler sasl.login.connect.timeout.ms=15000 sasl.oauthbearer.token.endpoint.url=https://<MY_SERVER>/auth/realms/<MY_REALM>/protocol/openid-connect/token ssl.truststore.location=<MY_PATH>\kafka.truststore.jks #ssl.truststore.password=changeit sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ clientId="my-kafka-client" \ clientSecret="my-kafka-client-secret";
Note - my Truststore does not have password (I tried setting it to see if that would solve the problem initially).
I'm using the following example test code:
package example; import java.io.IOException; import java.net.URISyntaxException; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; public class Main { public static void main(final String[] args) throws IOException, URISyntaxException { Properties config = new Properties(); config.load(Main.class.getClassLoader().getResourceAsStream("client.conf")); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config); } }
The issue seems to be in the org.apache.kafka.common.security.oauthbearer.secured package - in particular the AccessTokenRetrieverFactory.create() method, as it creates an sslContext but does not include the configured truststore from the Kafka configuration.
As such, it appears that unless you alter the JVM-default truststore, you cannot connect to a server running a custom trust-chain.