Python脚本消费多个Kafka topic

devtools/2024/11/25 7:51:45/

在Python中消费多个Kafka topic,可以使用kafka-python库,这是一个流行的Kafka客户端库。以下是一个详细的代码示例,展示如何创建一个Kafka消费者,并同时消费多个Kafka topic。

1.环境准备

(1)安装Kafka和Zookeeper:确保Kafka和Zookeeper已经安装并运行。

(2)安装kafka-python:通过pip安装kafka-python库。

bash复制代码pip install kafka-python

2.示例代码

以下是一个完整的Python脚本,展示了如何创建一个Kafka消费者并消费多个topic。

python">from kafka import KafkaConsumer
import json
import logging# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)# Kafka配置
bootstrap_servers = 'localhost:9092'  # 替换为你的Kafka服务器地址
group_id = 'multi-topic-consumer-group'
topics = ['topic1', 'topic2', 'topic3']  # 替换为你要消费的topic# 消费者配置
consumer_config = {'bootstrap_servers': bootstrap_servers,'group_id': group_id,'auto_offset_reset': 'earliest',  # 从最早的offset开始消费'enable_auto_commit': True,'auto_commit_interval_ms': 5000,'value_deserializer': lambda x: json.loads(x.decode('utf-8'))  # 假设消息是JSON格式
}# 创建Kafka消费者
consumer = KafkaConsumer(**consumer_config)# 订阅多个topic
consumer.subscribe(topics)try:# 无限循环,持续消费消息while True:for message in consumer:topic = message.topicpartition = message.partitionoffset = message.offsetkey = message.keyvalue = message.value# 打印消费到的消息logger.info(f"Consumed message from topic: {topic}, partition: {partition}, offset: {offset}, key: {key}, value: {value}")# 你可以在这里添加处理消息的逻辑# process_message(topic, partition, offset, key, value)except KeyboardInterrupt:# 捕获Ctrl+C,优雅关闭消费者logger.info("Caught KeyboardInterrupt, closing consumer.")consumer.close()except Exception as e:# 捕获其他异常,记录日志并关闭消费者logger.error(f"An error occurred: {e}", exc_info=True)consumer.close()

3.代码解释

(1)日志配置:使用Python的logging模块配置日志,方便调试和记录消费过程中的信息。

(2)Kafka配置:设置Kafka服务器的地址、消费者组ID和要消费的topic列表。

(3)消费者配置:配置消费者参数,包括自动重置offset、自动提交offset的时间间隔和消息反序列化方式(这里假设消息是JSON格式)。

(4)创建消费者:使用配置创建Kafka消费者实例。

(5)订阅topic:通过consumer.subscribe方法订阅多个topic。

(6)消费消息:在无限循环中消费消息,并打印消息的详细信息(topic、partition、offset、key和value)。

(7)异常处理:捕获KeyboardInterrupt(Ctrl+C)以优雅地关闭消费者,并捕获其他异常并记录日志。

4.运行脚本

确保Kafka和Zookeeper正在运行,并且你已经在Kafka中创建了相应的topic(topic1topic2topic3)。然后运行脚本:

bash复制代码python kafka_multi_topic_consumer.py

这个脚本将开始消费指定的topic,并在控制台上打印出每条消息的详细信息。你可以根据需要修改脚本中的处理逻辑,比如将消息存储到数据库或发送到其他服务。

5.参考价值和实际意义

这个示例代码展示了如何在Python中使用kafka-python库消费多个Kafka topic,适用于需要处理来自不同topic的数据流的场景。例如,在实时数据处理系统中,不同的topic可能代表不同类型的数据流,通过消费多个topic,可以实现数据的整合和处理。此外,该示例还展示了基本的异常处理和日志记录,有助于在生产环境中进行调试和监控。


http://www.ppmy.cn/devtools/136790.html

相关文章

MT8768/MTK8768安卓核心板性能参数_联发科安卓智能模块开发方案

MT8768安卓核心板 是一款采用台积电12nm FinFET制程工艺的智能手机芯片。MT8768核心板不仅提供所有高级功能和出色体验,同时确保智能终端具备长电池寿命。该芯片提供了一个1600x720高清(20:9比例)分辨率显示屏,排除了清晰度和功耗之间的平衡问题。该芯片…

网络云计算】2024第47周-每日【2024/11/21】周考-实操题-RAID6实操解析1

文章目录 1、RAID6配置指南(大致步骤)2、注意事项3、截图和视频 网络云计算】2024第47周-每日【2024/11/21】周考-实操题-RAID6实操 RAID6是一种在存储系统中实现数据冗余和容错的技术,其最多可以容忍两块磁盘同时损坏而不造成数据丢失。RAID…

设计模式之 命令模式

命令模式(Command Pattern)是行为型设计模式之一,它将请求(或命令)封装成一个对象,从而使用户能够将请求发送者与请求接收者解耦。通过命令模式,调用操作的对象与执行操作的对象不直接关联&…

解决 Gradle 报错:`Plugin with id ‘maven‘ not found` 在 SDK 开发中的问题

在 SDK 开发过程中,使用 Gradle 构建和发布 SDK 是常见的任务。在将 SDK 发布为 AAR 或 JAR 包时,你可能会使用 apply plugin: maven 来发布到本地或远程的 Maven 仓库。但是,随着 Gradle 版本的更新,特别是从 Gradle 7 版本开始&…

用源码编译虚幻引擎,并打包到安卓平台

用源码编译虚幻引擎,并打包到安卓平台 前往我的博客,获取更优的阅读体验 作业内容: 源码编译UE5.4构建C项目,简单设置打包到安卓平台 编译虚幻 5 前置内容 这里需要将 Epic 账号和 Github 账号绑定,然后加入 Epic 邀请的组织&#xff0c…

springboot 整合 rabbitMQ (延迟队列)

前言: 延迟队列是一个内部有序的数据结构,其主要功能体现在其延时特性上。这种队列存储的元素都设定了特定的处理时间,意味着它们需要在规定的时间点或者延迟之后才能被取出并进行相应的处理。简而言之,延时队列被设计用于存放那…

输入/输出管理 III(磁盘和固态硬盘)

一、磁盘 【总结】: 磁盘(Disk)是由表面涂有磁性物质的物理盘片,通过一个称为磁头的导体线圈从磁盘存取数据。在读/写操作期间,磁头固定,磁盘在下面高速旋转。如下图所示: 磁盘盘面…

WireShark

1. WireShark安装 去官网Wireshark Download下载安装包 之后按照安装向导进行安装。安装完成后,用户可以在桌面或应用程序中找到Wireshark的图标,双击打开软件 2. wireshark过滤语法 2.1过滤协议 http 2.2 过滤指定ip 匹配指定的IP地址数据包&…