基于docker的confluent-kafka搭建及python接口使用
- 1. 安装docker以及docker-compose
- 1.1 安装docker
- 1.2 安装docker-compose
- 2. 安装confluent-kafka
- 3. python接口使用
- 3.1 安装依赖包
- 3.2 创建、查看topic
- 3.3 python接口-broker
- 3.4 python接口-consumer
- 参考链接
本文介绍基于docker搭建的confluent-kafka及其python接口的使用。
本文采用的系统配置如下:
- LinuxMint 20.3 (兼容 Ununtu 20.04)
- docker 20.10.21
- docker-compose 2.14.2
- python 3.9.16
- confluent-kafka(python包) 2.1.1
1. 安装docker以及docker-compose
1.1 安装docker
docker-compose依赖于docker,因此需要先安装docker。
curl -fsSL https://test.docker.com -o test-docker.sh
sudo sh test-docker.sh
1.2 安装docker-compose
Compose 是用于定义和运行多容器 Docker 应用程序的工具。通过 Compose,您可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配置中创建并启动所有服务。
curl -L https://get.daocloud.io/docker/compose/releases/download/v2.14.2/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
要安装其他版本的 Compose,请替换 v2.14.2。
2. 安装confluent-kafka
新建文件并创建docker-compose.yml文件:
version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:7.0.1container_name: zookeeperenvironment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000broker:image: confluentinc/cp-kafka:7.0.1container_name: brokerports:# To learn about configuring Kafka for access across networks see# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/- "9092:9092"depends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXTKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
进入该文件夹并运行:
docker-compose -f docker-compose.yml up -d
运行后在docker中看到类似结果说明启动成功:
aa@bb:~/docker_scripts$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e6fbc05d61b1 confluentinc/cp-kafka:7.0.1 "/etc/confluent/dock…" 7 minutes ago Up 7 minutes 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp broker
58b04385f2bf confluentinc/cp-zookeeper:7.0.1 "/etc/confluent/dock…" 7 minutes ago Up 7 minutes 2181/tcp, 2888/tcp, 3888/tcp zookeeper
这里kafka端口为9092。
关闭容器服务:
docker-compose -f docker-compose.yml down
3. python接口使用
3.1 安装依赖包
安装依赖包:
pip3 install confluent-kafka
3.2 创建、查看topic
进入kafka镜像:
docker exec -ti broker bash
查看topic:
[aa@bb ~]$ /bin/kafka-topics --list --bootstrap-server 127.0.0.1:9092
新建名为test001的topic:
[aa@bb ~]$ /bin/kafka-topics --create --bootstrap-server 127.0.0.1:9092 --topic test001 --partitions 2
Created topic test001.
查看topic:
[aa@bb ~]$ /bin/kafka-topics --list --bootstrap-server 127.0.0.1:9092
test001
通过Ctrl
+ P
+ Q
回到终端。
3.3 python接口-broker
创建producer代码producer1.py
:
import socketfrom confluent_kafka import Producerconf = {'bootstrap.servers': "localhost:9092",'client.id': socket.gethostname()
}producer = Producer(conf)def __publish_delivery_report(err, msg) -> None:if err is not None:print(f"send msg:{msg} fail, err is not None")else:print(f"send msg{msg} success")def send_msg(topic: str, data):producer.produce(topic, data, callback=__publish_delivery_report)producer.flush()if __name__ == '__main__':msg = "hello kafka"topic = "test001"send_msg(topic, msg)
运行结果:
aa@bb:~/codes/kafka_test$ python3 producer1.py
send msg<cimpl.Message object at 0x7f8d6fe6acc0> success
3.4 python接口-consumer
创建consumer代码consumer1.py
:
from confluent_kafka import Consumerclass KafkaConsumer:def __init__(self, brokers, group):config = dict()config['bootstrap.servers'] = brokersconfig['group.id'] = groupconfig['auto.offset.reset'] = 'earliest'self.consumer = Consumer(config)def subscribe(self, topics):self.consumer.subscribe(topics=topics)def pull(self):while True:msg = self.consumer.poll(1.0)if msg is None:continueif msg.error():print("Consumer error: {}".format(msg.error()))continueprint('Received message: {}'.format(msg.value().decode('utf-8')))def close(self):self.consumer.close()if __name__ == "__main__":consumer = KafkaConsumer("127.0.0.1:9092", "test_group1")consumer.subscribe(['test001'])consumer.pull()consumer.close()
运行结果:
aa@bb:~/codes/kafka_test$ python3 consumer1.py
Received message: hello kafka
参考链接
- Hello Kafka(八)——Confluent Kafka简介
- Docker Compose | 菜鸟教程
- confluent_kafka生产者 - luckygxf - 博客园
- Hello Kafka(十二)——Python客户端_kafka python客户端_天山老妖的博客-CSDN博客