1,准备好Kafka 镜像包:
- bitnami/kafka:3.9.0 镜像资源包
2,准备好kafka.keystore.jks 和 kafka.truststore.jks证书
具体操作可参考:
Docker部署Kafka SASL_SSL认证,并集成到Spring Boot-CSDN博客
3,配置文件 docker-compose.yml
配置中使用的IP 1.14.165.18为主机IP,需要更换,提供外部访问
注意1.14.165.18要替换成主机IP
version: '3.8'services:kafka1:image: bitnami/kafka:3.9.0container_name: kafka1ports:- "9092:9092"environment:# KRaft settings- KAFKA_CFG_NODE_ID=1- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093- KAFKA_KRAFT_CLUSTER_ID=ncc_kafka# Listeners- KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://1.14.165.18:9092- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL- KAFKA_CLIENT_LISTENER_NAME=SASL_SSL# SASL- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN- KAFKA_CONTROLLER_USER=kafka- KAFKA_CONTROLLER_PASSWORD=kafka2024- KAFKA_INTER_BROKER_USER=kafka- KAFKA_INTER_BROKER_PASSWORD=kafka2024- KAFKA_CLIENT_USERS=kafka- KAFKA_CLIENT_PASSWORDS=kafka2024# SSL- KAFKA_TLS_TYPE=JKS- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=- KAFKA_CERTIFICATE_PASSWORD=kafka2024# Clustering- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2volumes:- '/data/kafka/kafka_1_data:/bitnami/kafka'- './kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'- './kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'networks:- kafka-netkafka2:image: bitnami/kafka:3.9.0container_name: kafka2ports:- "9093:9092"environment:# KRaft settings- KAFKA_CFG_NODE_ID=2- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093- KAFKA_KRAFT_CLUSTER_ID=ncc_kafka# Listeners- KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://1.14.165.18:9093- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL- KAFKA_CLIENT_LISTENER_NAME=SASL_SSL# SASL- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN- KAFKA_CONTROLLER_USER=kafka- KAFKA_CONTROLLER_PASSWORD=kafka2024- KAFKA_INTER_BROKER_USER=kafka- KAFKA_INTER_BROKER_PASSWORD=kafka2024- KAFKA_CLIENT_USERS=kafka- KAFKA_CLIENT_PASSWORDS=kafka2024# SSL- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=- KAFKA_TLS_TYPE=JKS- KAFKA_CERTIFICATE_PASSWORD=kafka2024# Clustering- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2volumes:- '/data/kafka/kafka_2_data:/bitnami/kafka'- './kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'- './kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'networks:- kafka-netkafka-:image: bitnami/kafka:3.9.0container_name: kafka3ports:- "9094:9092"environment:# KRaft settings- KAFKA_CFG_NODE_ID=3- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093- KAFKA_KRAFT_CLUSTER_ID=ncc_kafka# Listeners- KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://1.14.165.18:9094- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL- KAFKA_CLIENT_LISTENER_NAME=SASL_SSL# SASL- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN- KAFKA_CONTROLLER_USER=kafka- KAFKA_CONTROLLER_PASSWORD=kafka2024- KAFKA_INTER_BROKER_USER=kafka- KAFKA_INTER_BROKER_PASSWORD=kafka2024- KAFKA_CLIENT_USERS=kafka- KAFKA_CLIENT_PASSWORDS=kafka2024# SSL- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=- KAFKA_TLS_TYPE=JKS- KAFKA_CERTIFICATE_PASSWORD=kafka2024# Clustering- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2volumes:- '/data/kafka/kafka_3_data:/bitnami/kafka'- './kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro'- './kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro'networks:- kafka-net
networks:kafka-net:driver: bridge
4,创建数据挂载目录
sudo mkdir -p /data/kafka/kafka_1_data /data/kafka/kafka_2_data /data/kafka/kafka_3_data
sudo chmod 777 /data/kafka/*
5,创建 Docker 网络
为了确保 Kafka 节点之间可以相互通信,我们需要创建一个 Docker 网络。
sudo docker network create kafka-net
6,启动服务
在 kafka-cluster 目录中运行以下命令来启动 Kafka 集群:
sudo docker-compose up -d
7,测试验证:
在容器修改producer.properties和consumer.properties
增加以下参数:
具体操作可查看上篇文章
ssl.endpoint.identification.algorithm=
producer.ssl.endpoint.identification.algorithm=
consumer.ssl.endpoint.identification.algorithm=
注意1.14.165.18要替换成主机IP
测试发送消息:
sudo docker exec -it kafka1 kafka-console-producer.sh --bootstrap-server 1.14.165.18:9092 --topic test --producer.config /opt/bitnami/kafka/config/producer.properties
测试接收消息:
sudo docker exec -it kafka1 kafka-console-consumer.sh --bootstrap-server 1.14.165.18:9092 --topic test --consumer.config /opt/bitnami/kafka/config/consumer.properties
10,使用Spring Boot 集成Kafka集群
添加pom依赖:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
配置application.yml,并修改对应服务IP地址
注意1.14.165.18要替换成Kafka服务器IP
spring:application:name: ncckafka:bootstrap-servers:- 1.14.165.18:9092- 1.14.165.18:9093- 1.14.165.18:9094properties:security.protocol: SASL_SSLsasl.mechanism: SCRAM-SHA-512sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka2024";ssl.truststore.location: kafka.truststore.jksssl.truststore.password: kafka2024ssl.keystore.location: kafka.keystore.jksssl.keystore.password: kafka2024ssl.key.password: kafka2024ssl.endpoint.identification.algorithm:producer.ssl.endpoint.identification.algorithm:consumer.ssl.endpoint.identification.algorithm:
并将kafka.keystore.jks 和 kafka.truststore.jks 文件放到当前项目
11,创建KafkaTest测试类
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootTest(classes = NccApplication.class)
public class KafkaTest {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@Testvoid send() {kafkaTemplate.send("test","hello client ");}}
测试通过