An example that shows how to integrate Camel with Kafka with OAuth authentication using a client secret. The authentication is handled by Keycloak.
This example requires podman.
On the Kafka side it uses Strimzi Oauth for Apache Kafka, this library must also be set on the client side.
The Kafka Oauth client side configuration is set in the src/main/resources/application.properties
. You may want to learn from the Strimzi OAuth project the numerous configurations to have it working with your Kafka Broker, for example you may want to use OAuth Refresh tokens or use JWT tokens.
The configuration is in src/main/resources/application.properties
, you are welcome to learn more from the Kafka Security and Strimzi OAuth documentations.
camel.component.kafka.security-protocol = SASL_PLAINTEXT camel.component.kafka.sasl-mechanism = OAUTHBEARER camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ oauth.client.id="kafka-producer-client" \ oauth.client.secret="kafka-producer-client-secret" \ oauth.username.claim="preferred_username" \ oauth.ssl.truststore.location="containers/certificates/ca-truststore.p12" \ oauth.ssl.truststore.type="pkcs12" \ oauth.ssl.truststore.password="changeit" \ oauth.token.endpoint.uri="https://keycloak:8443/realms/demo/protocol/openid-connect/token" ; camel.component.kafka.additional-properties[sasl.login.callback.handler.class]=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
-
Set the hosts in /etc/hosts
We have to set the IP addresses in /etc/hosts (check your OS how to do it), verify the current IP address and correctly set it as the example shows. This is essential for the client application to reach keycloak and kafka hosts. In a production scenario those hosts names is going to be resolved by DNS.
192.168.0.104 keycloak 192.168.0.104 kafka
-
Launch the Keycloak server. Note that the scripts runs a podman container with
host
network, so the keycloak and kafka server shares the same network as the client, this is for demo purposes only.
./start_keycloak.sh
It must show the demo
realm was imported successfully.
[org.keycloak.exportimport.dir.DirImportProvider] (main) Importing from directory /opt/keycloak/bin/../data/import [org.keycloak.exportimport.util.ImportUtils] (main) Realm 'demo' imported [org.keycloak.services] (main) KC-SERVICES0032: Import finished successfully
It also shows the server started.
[io.quarkus] (main) Keycloak 26.0.8 on JVM (powered by Quarkus 3.15.1) started in 9.169s. Listening on: http://0.0.0.0:8080 and https://0.0.0.0:8443 [io.quarkus] (main) Profile prod activated. [io.quarkus] (main) Installed features: [agroal, cdi, hibernate-orm, jdbc-h2, keycloak, narayana-jta, opentelemetry, reactive-routes, rest, rest-jackson, smallrye-context-propagation, vertx]
-
Launch the Kafka broker
Open another terminal console and launch kafka broker:
./start_kafka.sh
It should show the kafka broker authenticates to the keycloak server using the kafka-broker
client id.
loginWithClientSecret() - tokenEndpointUrl: http://keycloak:8080/realms/demo/protocol/openid-connect/token, clientId: kafka-broker, clientSecret: k*********, scope: null, audience: null, connectTimeout: 20, readTimeout : 60, retries: 0, retryPauseMillis: 0 (io.strimzi.kafka.oauth.common.OAuthAuthenticator)
It should show the kafka broker started
Kafka version: 3.9.0 (org.apache.kafka.common.utils.AppInfoParser) [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)
Build and run the quickstart.
mvn compile spring-boot:run
It should display the kafka OAuth settings, example:
sasl.login.callback.handler.class = class io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler sasl.mechanism = OAUTHBEARER security.protocol = SASL_PLAINTEXT
It should show the producer message and the consumer message.
[a_Topic1]] route1 : >> Message sent: Hi from Camel - Wed Jan 15 12:11:42 WET 2025 [a_Topic1]] route2 : << Message received: Hi from Camel - Wed Jan 15 12:11:42 WET 2025
The kafka broker log should display the OAuth logging.
DEBUG Set validated token on callback: BearerTokenWithPayloadImpl (principalName: service-account-kafka-producer-client, groups: null, lifetimeMs: 1736978965000 [2025-01-15T22:09:25 UTC], startTimeMs: 1736942965000 [2025-01-15T12:09:25 UTC], scope: [profile, email], payload: {"exp":1736978965,"iat":1736942965,"jti":"43781656-a432-47f5-b0ae-c44e3224bb2b","iss":"https://keycloak:8443/realms/demo","sub":"f288b7db-a3e4-4cf4-80d3-2e5118bb2c9c","typ":"Bearer","azp":"kafka-producer-client","acr":"1","scope":"email profile","email_verified":false,"clientHost":"192.168.0.104","preferred_username":"service-account-kafka-producer-client","clientAddress":"192.168.0.104","email":"[email protected]","client_id":"kafka-producer-client"}, sessionId: 1893424185) (io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler)
Press Ctrl-C
to exit.
If you hit any problem using Camel or have some feedback, then please let us know.
We also love contributors, so get involved :-)
The Camel riders!