python-kafka客户端封装

news/2025/2/12 8:10:41/

目录

  • 前言
  • 封装代码
  • 测试代码
  • 参考


前言

本文对python的kafka包做简单封装,方便kafka初学者使用。包安装:

pip install kafka-python

封装代码

kafka_helper.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import traceback
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from typing import Listclass KProducer:def __init__(self, bootstrap_servers: List, key_serializer=lambda m: json.dumps(m).encode("ascii"),value_serializer=lambda m: json.dumps(m).encode("ascii"), compression_type=None):try:self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers,buffer_memory=33554432,batch_size=1048576,max_request_size=1048576,key_serializer=key_serializer,value_serializer=value_serializer,compression_type=compression_type  # 压缩消息发送 gzip lz4 snappy zstd)print("connect success, kafka producer info {0}".format(bootstrap_servers))except Exception as e:raise Exception("connect kafka failed, {}.".format(e))def sync_send(self, topic: str, data):"""同步发送数据:param data:  发送数据:param topic: 主题:return: partition, offset"""try:future = self.producer.send(topic, data)record_metadata = future.get(timeout=10)  # 同步确认消费partition = record_metadata.partition  # 数据所在的分区offset = record_metadata.offset  # 数据所在分区的位置print("save success, partition: {}, offset: {}".format(partition, offset))return partition, offsetexcept Exception as e:raise Exception("Kafka sync send failed, {}.".format(e))def async_send(self, topic: str, data):"""异步发送数据:param data:  发送数据:param topic: 主题:return: None"""try:self.producer.send(topic, data)print("send data:{}".format(data))except Exception as e:raise Exception("Kafka asyn send failed, {}.".format(e))def async_callback(self, topic: str, data):"""异步发送数据 + 发送状态处理:param data:发送数据:param topic: 主题:return: None"""try:for item in data:self.producer.send(topic, item).add_callback(self.__send_success).add_errback(self.__send_error)self.producer.flush()  # 批量提交except Exception as e:raise Exception("Kafka asyn send fail, {}.".format(e))@staticmethoddef __send_success():"""异步发送成功回调函数"""print("save success")return@staticmethoddef __send_error():"""异步发送错误回调函数"""print("save error")returndef close(self):self.producer.close()class KConsumer:def __init__(self, bootstrap_servers: List, topic: str, group_id: str, key_deserializer=None,value_deserializer=None, auto_offset_reset="latest"):self.topic = topictry:self.consumer = KafkaConsumer(self.topic,bootstrap_servers=bootstrap_servers,group_id=group_id,enable_auto_commit=False,auto_commit_interval_ms=1000,session_timeout_ms=30000,max_poll_records=50,max_poll_interval_ms=30000,metadata_max_age_ms=3000,key_deserializer=key_deserializer,value_deserializer=value_deserializer,auto_offset_reset=auto_offset_reset)self.consumer.subscribe(topics=[self.topic])print("connect to kafka and subscribe topic success")except Exception as e:raise Exception("Kafka pconsumers set connect fail, {0}, {1}".format(e, traceback.print_exc()))def get_consumer(self):"""返会可迭代consumer:return: consumer"""return self.consumerdef set_topic(self, topic: str):"""订阅主题:param topic: 主题:return: None"""self.topic = topicself.consumer.subscribe(topics=[self.topic])def get_message_by_partition_offset(self, partition, offset):"""通过partition、offset获取一个消息:param partition: 分区:param offset: 游标、下标、序号:return: message,消息"""self.consumer.unsubscribe()partition = TopicPartition(self.topic, partition)self.consumer.assign([partition])self.consumer.seek(partition, offset=offset)for message in self.consumer:return message

测试代码

kafka_test.py

from kafka_helper import KProducer,KConsumer
import jsondef sync_send_test(bootstrap_servers,topic,json_format=True):value = {"send_type": "sync_send","name": "lady_killer","age": 18}if json_format:p = KProducer(bootstrap_servers=bootstrap_servers)p.sync_send(value,topic)else:p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)v = bytes('{}'.format(json.dumps(value)), 'utf-8')p.sync_send(v,topic)p.close()def async_send_test(bootstrap_servers,topic,json_format=True):value = {"send_type": "async_send","name":"lady_killer","age":18}if json_format:p = KProducer(bootstrap_servers=bootstrap_servers)p.asyn_send(value,topic)else:p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)v = bytes('{}'.format(json.dumps(value)), 'utf-8')p.asyn_send(v,topic)p.close()def consumer_test(bootstrap_servers,topic):c = KConsumer(bootstrap_servers=bootstrap_servers,topic=topic,group_id='test',auto_offset_reset="earliest")for data in c.get_consumer():print(type(data.value),data.value)print(json.loads(data.value))def get_one_msg(bootstrap_servers,topic,partition,offset):c = KConsumer(bootstrap_servers=bootstrap_servers, topic=topic, group_id='test', auto_offset_reset="earliest")msg = c.get_message_by_partition_offset(partition,offset)print(msg)if __name__ == '__main__':bootstrap_servers = ["kafka:9092"]topic = "demodata"# 测试生产sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic)async_send_test(bootstrap_servers=bootstrap_servers,topic=topic)sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)async_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)# 测试消费consumer_test(bootstrap_servers=bootstrap_servers,topic=topic)# get_one_msg(bootstrap_servers=bootstrap_servers,topic=topic,partition=0,offset=0)

参考

Kafka入门,这一篇就够了(安装,topic,生产者,消费者)


http://www.ppmy.cn/news/1102000.html

相关文章

手写Spring:第4章-基于Cglib实现含构造函数的类实例化策略

文章目录 一、目标:含构造函数的类实例化二、设计:含构造函数的类实例化三、实现:含构造函数的类实例化3.1 工程结构3.2 含构造函数的类实例化类图3.3 类实例化策略3.3.1 定义实例化策略接口3.3.2 JDK实例化3.3.3 Cglib实例化 3.4 抽象类定义…

微信小程序——生命周期

在微信小程序中,可以通过生命周期函数来执行相应的代码操作。以下是一些常见的生命周期代码操作示例: 在 onLoad 生命周期中进行数据初始化和网络请求: onLoad: function(options) {// 数据初始化this.setData({name: John,age: 25});// 网…

ArcGIS API for JavaScript 4.x 实现动态脉冲效果

1. 设计思路 主要通过定时刷新,每一次的脉冲渲染圈不停的放大,并且透明度缩小,直到达到一定的大小再退回0。 2. 实现代码 import MapView from "arcgis/core/views/MapView"; import GraphicsLayer from "arcgis/core/laye…

浅述C++模板——函数模板及类模板

前言 模板作为 C 的一大特色,对于泛型编程有着重要的作用。同时,对于大规模类似的函数或是类型不确定的类,模板都起了至关重要的作用。 一、模板 在开始学习模板之前,我们首先需要了解模板。先看下面一个例子: #in…

跨站请求伪造

CSRF是什么? 跨站请求伪造(Cross Site Request Forgery,CSRF)是一种攻击,它强制浏览器客户端用户在当前对其进行身份验证后的Web 应用程序上执行非本意操作的攻击,攻击的重点在于更改状态的请求,而不是盗取数据&#x…

Datastage部署与使用

Datastage部署与使用 - 码农教程 https://www.cnblogs.com/lanston/category/739553.html Streamsets定时拉取接口数据同步到HBase集群_streamsets api_webmote的博客-CSDN博客 【SDC】StreamSets实战之路-28-实战篇- 使用StreamSets实时采集指定数据目录文件并写入库Kudu_菜…

Spark【Spark SQL(三)DataSet】

DataSet DataFrame 的出现,让 Spark 可以更好地处理结构化数据的计算,但存在一个问题:编译时的类型安全问题,为了解决它,Spark 引入了 DataSet API(DataFrame API 的扩展)。DataSet 是分布式的数…

堆的实现方式——优先级队列

大顶堆与小顶堆 根结点大 大顶堆 根节点小 小顶堆 堆每次pop 都是根元素 所有小顶堆最终保留大元素 大顶堆最终保留小元素 优先级队列 其实就是一个披着队列外衣的堆,因为优先级队列对外接口只是从队头取元素,从队尾添加元素,再无其他取元…