基于docker的confluent-kafka搭建及python接口使用

news/2024/10/30 19:24:59/

基于docker的confluent-kafka搭建及python接口使用

  • 1. 安装docker以及docker-compose
    • 1.1 安装docker
    • 1.2 安装docker-compose
  • 2. 安装confluent-kafka
  • 3. python接口使用
    • 3.1 安装依赖包
    • 3.2 创建、查看topic
    • 3.3 python接口-broker
    • 3.4 python接口-consumer
  • 参考链接

本文介绍基于docker搭建的confluent-kafka及其python接口的使用。

本文采用的系统配置如下:

  • LinuxMint 20.3 (兼容 Ununtu 20.04)
  • docker 20.10.21
  • docker-compose 2.14.2
  • python 3.9.16
  • confluent-kafka(python包) 2.1.1

1. 安装docker以及docker-compose

1.1 安装docker

docker-compose依赖于docker,因此需要先安装docker。

curl -fsSL https://test.docker.com -o test-docker.sh
sudo sh test-docker.sh

1.2 安装docker-compose

Compose 是用于定义和运行多容器 Docker 应用程序的工具。通过 Compose,您可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配置中创建并启动所有服务。

curl -L https://get.daocloud.io/docker/compose/releases/download/v2.14.2/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose

要安装其他版本的 Compose,请替换 v2.14.2。

2. 安装confluent-kafka

新建文件并创建docker-compose.yml文件:

version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:7.0.1container_name: zookeeperenvironment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000broker:image: confluentinc/cp-kafka:7.0.1container_name: brokerports:# To learn about configuring Kafka for access across networks see# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/- "9092:9092"depends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXTKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

进入该文件夹并运行:

docker-compose -f docker-compose.yml up -d

运行后在docker中看到类似结果说明启动成功:

aa@bb:~/docker_scripts$ docker ps
CONTAINER ID   IMAGE                             COMMAND                  CREATED         STATUS         PORTS                                       NAMES
e6fbc05d61b1   confluentinc/cp-kafka:7.0.1       "/etc/confluent/dock…"   7 minutes ago   Up 7 minutes   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp   broker
58b04385f2bf   confluentinc/cp-zookeeper:7.0.1   "/etc/confluent/dock…"   7 minutes ago   Up 7 minutes   2181/tcp, 2888/tcp, 3888/tcp                zookeeper

这里kafka端口为9092。

关闭容器服务:

docker-compose -f docker-compose.yml down

3. python接口使用

3.1 安装依赖包

安装依赖包:

pip3 install confluent-kafka

3.2 创建、查看topic

进入kafka镜像:

docker exec -ti broker bash

查看topic:

[aa@bb ~]$ /bin/kafka-topics --list --bootstrap-server 127.0.0.1:9092

新建名为test001的topic:

[aa@bb ~]$ /bin/kafka-topics --create --bootstrap-server 127.0.0.1:9092 --topic test001 --partitions 2
Created topic test001.

查看topic:

[aa@bb ~]$ /bin/kafka-topics --list --bootstrap-server 127.0.0.1:9092
test001

通过Ctrl + P + Q回到终端。

3.3 python接口-broker

创建producer代码producer1.py

import socketfrom confluent_kafka import Producerconf = {'bootstrap.servers': "localhost:9092",'client.id': socket.gethostname()
}producer = Producer(conf)def __publish_delivery_report(err, msg) -> None:if err is not None:print(f"send msg:{msg} fail, err is not None")else:print(f"send msg{msg} success")def send_msg(topic: str, data):producer.produce(topic, data, callback=__publish_delivery_report)producer.flush()if __name__ == '__main__':msg = "hello kafka"topic = "test001"send_msg(topic, msg)

运行结果:

aa@bb:~/codes/kafka_test$ python3 producer1.py
send msg<cimpl.Message object at 0x7f8d6fe6acc0> success

3.4 python接口-consumer

创建consumer代码consumer1.py

from confluent_kafka import Consumerclass KafkaConsumer:def __init__(self, brokers, group):config = dict()config['bootstrap.servers'] = brokersconfig['group.id'] = groupconfig['auto.offset.reset'] = 'earliest'self.consumer = Consumer(config)def subscribe(self, topics):self.consumer.subscribe(topics=topics)def pull(self):while True:msg = self.consumer.poll(1.0)if msg is None:continueif msg.error():print("Consumer error: {}".format(msg.error()))continueprint('Received message: {}'.format(msg.value().decode('utf-8')))def close(self):self.consumer.close()if __name__ == "__main__":consumer = KafkaConsumer("127.0.0.1:9092", "test_group1")consumer.subscribe(['test001'])consumer.pull()consumer.close()

运行结果:

aa@bb:~/codes/kafka_test$ python3 consumer1.py
Received message: hello kafka

参考链接

  1. Hello Kafka(八)——Confluent Kafka简介
  2. Docker Compose | 菜鸟教程
  3. confluent_kafka生产者 - luckygxf - 博客园
  4. Hello Kafka(十二)——Python客户端_kafka python客户端_天山老妖的博客-CSDN博客

http://www.ppmy.cn/news/62081.html

相关文章

[2023-DAS x SU战队2023开局之战] crypto-sign1n

题目描述&#xff1a; from secret import r, t from Crypto.Util.number import *flag bxxx flag bytes_to_long(flag) e 0x10001def gen_keys():p getPrime(1024)q getPrime(1024)phi (p-1)*(q-1)d inverse(e,phi)n p*qprint(fn {n})WHATF (d ** 3 3) % phiprint…

一个让人类窒息的AI工具,或许未来人工智能真的能代替人类!

时隔几周&#xff0c;「神采PromeAI」又更新了 不仅页面做了小小的调整 又增加了「背景生成」功能 害怕各位小伙伴找不到使用位置 今天小编就给大家分享一个超全的使用教程 极速出图效率翻倍 让神采PromeAI在应用性设计方面更具优势 温馨提示&#xff1a;目前手机适配端无…

PAVC100R4222 PARKER轴向柱塞泵

PAVC100R4222 PARKER轴向柱塞泵特点&#xff1a; 1、壳体为高强度铸铁 2、两段设计便于维护 3、全密封的轴用轴承 4、内置增压器***高转速性能&#xff0c;可达3000 RPM( PAVC100为2600 RPM) 5、控制器为插装形式&#xff0c;易于现场更换 6、配流盘为可替换的青铜复合 10、过滤…

redis 持久化 RDB + AOF

redis 持久化 RDB AOF 1.redis持久化----两种方式 RDB&#xff08;Redis DataBase&#xff09;和AOF&#xff08;Append Only File&#xff09; RDB&#xff0c;简而言之&#xff0c;就是在不同的时间点&#xff0c;将redis存储的数据生成快照并存储到磁盘等介质上 AOF&am…

MySQL的ID用完了,怎么办?

目 录 一 首先首先分情况 二 自增ID 1 mysql 数据库创建一个自增键的表 2 导出表结构 3 重新创建 自增键是4294967295的表 4 查看表结构 5 异常测试 三 填充主键 1 首先创建一个test 表&#xff0c;主键不自增 2 插入主键最大值 3 再次插入主键最大值1 四 没有声明…

项目角色定义

一、项目集经理的角色 项目集经理是由执行组织授权、领导团队实现项目集目标的人员。项目集经理对项目集的领导、 实施和绩效负责&#xff0c;并负责组建一支能够实现项目集目标和预期项目集效益的项目集团队。项目集经 理的角色与项目经理的角色不同。二者之间的差异是基于项…

【Java EE 初阶】如何保证线程安全

目录 1.线程是什么&#xff1f; 2.线程安全&#xff08;重点&#xff09; 1.概念&#xff1a; 1.举例&#xff1a;用两个线程分别对同一个变量做五万次自增&#xff0c;观察答案是否符合预期 那么是哪些原因造成了这种线程不安全的现象呢&#xff1f;我们一起来分析一下&am…

消息队列中的事务消息

大家好&#xff0c;我是易安&#xff01;今天我们谈一谈消息队列中的事务消息这个话题。 一说起事务&#xff0c;你可能自然会联想到数据库。我们日常使用事务的场景&#xff0c;绝大部分都是在操作数据库的时候。像MySQL、Oracle这些主流的关系型数据库&#xff0c;也都提供了…