Python 全栈系列266 Kafka服务的Docker搭建

news/2024/9/17 8:35:09/ 标签: python, kafka, docker

说明

在大量数据处理任务下的缓存与分发

这个算是来自顾同学的助攻+1,我有点java绝缘体的体质,碰到和java相关的安装部署总会碰到点奇怪的问题,不过现在已经搞定了。测试也接近了kafka官方标称的性能。考虑到网络、消息的大小等因素,可以简单认为kafka的速度是10万/秒级的。

本次文章的目的是:

  • 1 搭建一个平时工作中常用的队列服务
  • 2 方便自己或者其他同事再次搭建

内容

1 搭建过程

共要搭建两个服务:zookeeper和kafka

1.1 创建zookeeper

这个是基础服务,必须要最先启动

docker run -d --name zookeeper -e \
ZOOKEEPER_CLIENT_PORT=2181 -e \
ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 \
registry.cn-hangzhou.aliyuncs.com/andy08008/zookeeper0718:v100

通常来说,这个服务启动后就不用管了,但是偶尔如果需要debug的时候:

docker exec -it zookeeper bash
bin/zkCli.sh -server 127.0.0.1:2181
ls /brokers/ids

1.2 创建持久化路径

这个会实际保存kafka的消息

mkdir -p /data/kafka-logs

kafka_36">1.3 创建kafka

一种场景是只监听外网IP(WAN_IP),另一种场景是同时监听内外网(LAN_IP)。

只监听外网的比较简单

WAN_IP=111
LAN_IP=222
docker run -it --rm --name kafka \-p 24666:24666 \--link zookeeper:zk \-e HOST_IP=localhost \-e KAFKA_BROKER_ID=1 \-e KAFKA_ZOOKEEPER_CONNECT=zk:2181 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${WAN_IP}:24666  \-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:24666 \-e KAFKA_LOG_DIRS=/data/kafka-logs \-v /data/kafka-logs:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100

同时监听内外网的比较麻烦(且要求端口不同)

WAN_IP=111
LAN_IP=222
docker run -d --name kafka \-p 24666:24666 \-p 9092:9092 \--link zookeeper:zk \-e HOST_IP=localhost \-e KAFKA_BROKER_ID=1 \-e KAFKA_ZOOKEEPER_CONNECT=zk:2181 \-e KAFKA_ADVERTISED_LISTENERS=INTERNAL://${LAN_IP}:9092,EXTERNAL://${WAN_IP}:24666 \-e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:24666 \-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \-e KAFKA_LISTENER_NAME=INTERNAL \-e KAFKA_LISTENER_NAME=EXTERNAL \-e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \-e KAFKA_LOG_DIRS=/data/kafka-logs \-v /data/kafka-logs:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100

配置解释
KAFKA_LISTENERS:

  • INTERNAL://0.0.0.0:9092 用于所有网络接口监听。

  • EXTERNAL://0.0.0.0:24666 用于所有网络接口监听。

  • KAFKA_ADVERTISED_LISTENERS:

  • INTERNAL://IP:9092 用于内网客户端。

  • EXTERNAL://IP:24666 用于外网客户端。

  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:

  • INTERNAL:PLAINTEXT 和 EXTERNAL:PLAINTEXT 映射了每个监听器名称和协议类型。

注释
• docker run -d --name kafka:启动一个名为 kafka 的容器,并在后台运行。
• -p 9092:9092:将主机的 9092 端口映射到容器的 9092 端口,这是 Kafka 的默认端口。
• --link zookeeper:zk:将名为 zookeeper 的容器链接到当前容器,并在当前容器中以 zk 作为别名进行访问。
• -e HOST_IP=localhost:设置环境变量 HOST_IP 为 localhost。
• -e KAFKA_BROKER_ID=1:设置 Kafka 的 broker ID 为 1。【如果有多个,应该在这里区分】
• -e KAFKA_ZOOKEEPER_CONNECT=zk:2181:指定 Zookeeper 的连接地址。
• -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://xxx:9092:设置 Kafka 的广告监听器地址。【这个是实际上Consumer一定会用的。】
• -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092:设置 Kafka 的监听地址。
• -e KAFKA_LOG_DIRS=/data/kafka-logs:指定 Kafka 日志存储目录。
• -v /data/kafka-logs:/data/kafka-logs:将主机的 /data/kafka-logs 目录挂载到容器的 /data/kafka-logs 目录,以持久化存储 Kafka 日志。

2 测试

2.1 生产者测试

python">from pydantic import BaseModel, field_validator
import json 
import pandas as pd 
class KafkaJsonMsgList(BaseModel):json_list : list@propertydef msg_list(self):return pd.Series(self.json_list).apply(json.loads).to_list()from func_timeout import func_set_timeout,FunctionTimedOutimport json
from confluent_kafka import Producer
# @func_set_timeout(60)def send_messages(bootstrap_servers = None, topic= None, messages= None):"""发送消息到 Kafka 主题:param bootstrap_servers: Kafka 服务器地址:param topic: Kafka 主题:param messages: 要发送的消息列表"""# 创建 Producer 实例producer = Producer(**{'bootstrap.servers': bootstrap_servers,'acks': 1 })for msg in messages:try:producer.produce(topic, msg)except BufferError:# 如果队列已满,等待队列空出空间producer.poll(1)# 定期调用poll以确保消息传递producer.poll(0)# 确保所有消息都被发送producer.flush()msg_list = [json.dumps({'id':i ,'value':'aaa','aa':'''this is test'''}) for i in range(3)]
topic = 'my_test6'
# 外网
## bootstrap_servers = 'WAN_IP:24666'
# 内网
bootstrap_servers = 'LAN_IP:9092'send_messages(bootstrap_servers=bootstrap_servers,topic=topic,messages = msg_list)

2.2 消费者测试

python">from confluent_kafka import Consumer# 如果是非json的,直接拿到就可以了
# @func_set_timeout(60)def consume_messages(config = None, topic = None, max_messages = 3):# Create Consumer instanceconsumer = Consumer(config)# Subscribe to topicconsumer.subscribe([topic])consumed_count = 0res_list = []try:while consumed_count < max_messages:msg = consumer.poll(1.0)if msg is None:print('Empty Q')break else:res_list.append(msg.value().decode('utf-8'))consumed_count += 1if consumed_count >= max_messages:breakexcept KeyboardInterrupt:passfinally:# Leave group and commit final offsetsconsumer.close()return res_list # 外网
config = {
# User-specific properties that you must set
'bootstrap.servers': 'WAN_IP:24666',
'group.id':'group1',
'auto.offset.reset': 'earliest', 
'enable.auto.commit': True
}
# 内网
config = {
# User-specific properties that you must set
'bootstrap.servers': 'LAN_IP:9092',
'group.id':'group1',
'auto.offset.reset': 'earliest', 
'enable.auto.commit': True
}
topic = 'my_test6'
import time 
tick1 = time.time()
max_messages = 100  # 这里设置要消费的消息数量
json_list = consume_messages(config, topic, max_messages)
tick2 = time.time()
kj = KafkaJsonMsgList(json_list = json_list)
msg_list = kj.msg_list
tick3 = time.time()

2.3 性能测试

发送端,1.48秒发送10万条消息,稍微弱了点,不过考虑这个是一台仅仅4核8G且繁忙的机器,那就还好(我默认的方式是需要json序列化的)。

python">
tick1 = time.time()
msg_list_10w = [json.dumps({'id':i ,'value':'aaa','aa':'''this is test'''}) for i in range(100000)]
topic = 'my_test6'
send_messages(bootstrap_servers=bootstrap_servers,topic=topic,messages = msg_list_10w)
tick2 = time.time()
print('takes %.2f to send 100000' % (tick2-tick1))
takes 1.48 to send 100000
```接收端
````python
topic = 'my_test6'
import time 
tick1 = time.time()
max_messages = 100000  # 这里设置要消费的消息数量
json_list = consume_messages(config, topic, max_messages)
tick2 = time.time()
kj = KafkaJsonMsgList(json_list = json_list)
msg_list = kj.msg_list
tick3 = time.time()
print(tick2-tick1, 'get_time')
print(tick3-tick2, 'parse-time')1.3391587734222412 get_time
0.24841904640197754 parse-time
```总体上还是满意的,可以了。

http://www.ppmy.cn/news/1521666.html

相关文章

IOS17.0安装巨魔:TrollRestore巨魔发布

&#x1f47b; TrollRestore 17.0 巨魔发布 15.0 - 16.7 RC&#xff08;20H18&#xff09;和17.0。 官网&#xff1a;https://trollrestore.com/ 下载&#xff1a;https://pan.metanetdisk.com/IOS/%E5%B7%A8%E9%AD%94%E7%8E%A9%E5%AE%B6/TrollRestore.com 使用&#xff1a;ht…

点在面内(考虑内环外环)------射线算法

射线算法计算点是否在面内 , 考虑内环和外环的情况 // 判断点是否在线段上 bool on_segment(const studio_point& point, const studio_point& l_beg, const studio_point& l_end) {return std::min(l_beg.lgtd, l_end.lgtd) < point.lgtd && point.lg…

计算机网络 第二章: 物理层概述

文章目录 物理层要实现的功能物理层接口特性机械特性电气特性功能特性过程特性 物理层下面的传输媒体传输媒体的分类 传输方式习题答案 物理层要实现的功能 物理层要实现的功能是在各种传输媒体上, 传输0和1吗进而给其上面的数据链路层提供透明传输比特流的服务. (透明传输比特…

硬件工程师笔试面试知识器件篇——二极管

目录 4、二极管 4.1、基础 二极管原理图 二极管实物图 4.1.1、基本特性 4.1.2、常见类型 4.1.3、工作原理 4.1.4、应用领域 4.2、相关问题 4.2.1、二极管的PN结是如何形成的? 4.2.2、发光二极管(LED)的工作原理是什么? 4.2.3、在电子电路中,二极管通常如何应用?…

MySQL Workbench 的入门指南

前言 MySQL Workbench 是一个官方的图形化工具&#xff0c;用于开发、管理和设计 MySQL 数据库服务器。它提供了丰富的功能&#xff0c;可以帮助数据库管理员、开发者以及DBA们高效地工作。下面是一个MySQL Workbench的入门指南&#xff0c;介绍如何安装和使用它。 安装 MyS…

虚拟机苹果系统MacOS中XCode的安装

1、背景介绍 主机系统Win11&#xff0c;虚拟机VMWare17&#xff0c;苹果系统MacOS 13.6.7 2、Xcode的在线 点击应用市场&#xff0c;输入Xcode搜索&#xff1a; 看来Xcode无法安装在macOS V13上&#xff0c;直接在线安装失败。 3、采用下载安装包的方法进行安装 解决办法参考链…

ChatTTS文本转语音本地Windows环境部署与远程生成AI音频实战流程

文章目录 前言1. 下载运行ChatTTS模型2. 安装Cpolar工具3. 实现公网访问4. 配置ChatTTS固定公网地址 前言 本篇文章主要介绍如何快速地在Windows系统电脑中本地部署ChatTTS开源文本转语音项目&#xff0c;并且我们还可以结合Cpolar内网穿透工具创建公网地址&#xff0c;随时随…

2024国赛数学建模C题论文:基于优化模型的农作物的种植策略

大家可以查看一下35页&#xff0c;包含结构完整&#xff0c;数据完整的C题论文&#xff0c;完整论文见文末名片 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; 添加图片注释&#xf…

Java并发编程实战 04 | 使用WaitNotify时要注意什么?

在 Java 中&#xff0c;wait()、notify() 和 notifyAll() 方法在多线程编程中主要用于线程间的协作和同步。理解这些方法的使用特点对于编写稳定的多线程程序至关重要。我们将从以下三个问题入手深入探讨它们的使用&#xff1a; 为什么必须在 synchronized 代码块中使用 wait(…

LINUX-ubuntu20.04下安装GUI-Guider出现的依赖问题解决办法

个人安装遇见的问题&#xff0c;并且已解决&#xff0c;仅供参考&#xff01;&#xff01;&#xff01; 采用下载好gui-guider的安装包&#xff0c;然后离线安装的方式&#xff1b; 目录 问题&#xff1a; 一般方法 解决办法 问题&#xff1a; 发现出现下面的配置依赖问题…

算法训练营|图论第11天 Floyd算法 A*算法

题目&#xff1a;Floyd算法 题目链接&#xff1a; 97. 小明逛公园 (kamacoder.com) 代码&#xff1a; #include<bits/stdc.h> using namespace std; struct Edge {int to;int val;Edge(int t, int w) :to(t), val(w) {} }; int main() {int n, m;cin >> n >…

Java 快速求解x的x次幂结果为10

1.问题描述 如果x的x次幂结果为10&#xff08;如图所示&#xff09;&#xff0c;你能计算出x的近似值吗&#xff1f; 显然&#xff0c;这个值是介于2和3之间的一个数字。 可以使用牛顿迭代公式进行求解&#xff0c;因为是逼近算法可以大大减少运算次数 public static void mai…

【Leetcode 2367 】 等差三元组的数目 —— 哈希表与三指针

给你一个下标从 0 开始、严格递增 的整数数组 nums 和一个正整数 diff 。如果满足下述全部条件&#xff0c;则三元组 (i, j, k) 就是一个 等差三元组 &#xff1a; i < j < k &#xff0c;nums[j] - nums[i] diff 且nums[k] - nums[j] diff 返回不同 等差三元组 的数…

Flask中的g的作用

Flask中的g对象是一个非常重要的概念&#xff0c;它在Flask应用程序的上下文中扮演着关键角色。下面我将详细阐述g对象的作用&#xff0c;但由于篇幅限制&#xff0c;无法达到5000字&#xff0c;但会尽量全面而精炼地介绍其关键特性和用途。 Flask中的g是什么&#xff1f; 在…

PHP一站式解决方案高级房产系统小程序源码

一站式解决方案&#xff0c;高级房产系统让房产管理更轻松 &#x1f3e0;【开篇&#xff1a;告别繁琐&#xff0c;迎接高效房产管理新时代】&#x1f3e0; 你是否还在为房产管理的繁琐流程而头疼&#xff1f;从房源录入、客户咨询到合同签订、售后服务&#xff0c;每一个环节…

自然语言处理系列五十》文本分类算法》SVM支持向量机算法原理

注&#xff1a;此文章内容均节选自充电了么创始人&#xff0c;CEO兼CTO陈敬雷老师的新书《自然语言处理原理与实战》&#xff08;人工智能科学与技术丛书&#xff09;【陈敬雷编著】【清华大学出版社】 文章目录 自然语言处理系列五十SVM支持向量机》算法原理SVM支持向量机》代…

Spring Boot 整合 Sentinel 实现流量控制

在微服务架构中&#xff0c;流量控制是保障系统稳定性和高可用性的关键技术之一。阿里巴巴开源的 Sentinel 是一款面向分布式系统的流量防护组件&#xff0c;旨在从流量控制、熔断降级、系统负载保护等多个维度保障服务的稳定性。本文将详细介绍如何在 Spring Boot 项目中整合 …

预训练语言模型的前世今生 - 从Word Embedding到BERT

目录 一、预训练 1.1 图像领域的预训练1.2 预训练的思想二、语言模型 2.1 统计语言模型2.2 神经网络语言模型三、词向量 3.1 独热&#xff08;Onehot&#xff09;编码3.2 Word Embedding四、Word2Vec 模型五、自然语言处理的预训练模型六、RNN 和 LSTM 6.1 RNN6.2 RNN 的梯度消…

2024全国大学省数学建模竞赛A题-原创参考论文(部分+第一问代码)

一问题重述 1.1 问题背景 "板凳龙"&#xff0c;又称"盘龙"&#xff0c;是浙闽地区的传统地方民俗文化活动。这种独特的表演艺术形式融合了中国传统龙舞的精髓和地方特色&#xff0c;展现了人们对美好生活的向往和对传统文化的传承。 在板凳龙表演中&am…

2024年高教社杯数学建模国赛赛题浅析——助攻快速选题

一图流——一张图读懂国赛 总体概述&#xff1a; A题偏几何与运动学模型&#xff0c;适合有几何与物理背景的队伍&#xff0c;数据处理复杂性中等。 B题侧重统计和优化&#xff0c;适合有运筹学和经济学背景的队伍&#xff0c;数据处理较为直接但涉及多步骤的决策优化。 C题…