目录
- 前言
- 封装代码
- 测试代码
- 参考
前言
本文对python的kafka包做简单封装,方便kafka初学者使用。包安装:
pip install kafka-python
封装代码
kafka_helper.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import traceback
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from typing import Listclass KProducer:def __init__(self, bootstrap_servers: List, key_serializer=lambda m: json.dumps(m).encode("ascii"),value_serializer=lambda m: json.dumps(m).encode("ascii"), compression_type=None):try:self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers,buffer_memory=33554432,batch_size=1048576,max_request_size=1048576,key_serializer=key_serializer,value_serializer=value_serializer,compression_type=compression_type # 压缩消息发送 gzip lz4 snappy zstd)print("connect success, kafka producer info {0}".format(bootstrap_servers))except Exception as e:raise Exception("connect kafka failed, {}.".format(e))def sync_send(self, topic: str, data):"""同步发送数据:param data: 发送数据:param topic: 主题:return: partition, offset"""try:future = self.producer.send(topic, data)record_metadata = future.get(timeout=10) # 同步确认消费partition = record_metadata.partition # 数据所在的分区offset = record_metadata.offset # 数据所在分区的位置print("save success, partition: {}, offset: {}".format(partition, offset))return partition, offsetexcept Exception as e:raise Exception("Kafka sync send failed, {}.".format(e))def async_send(self, topic: str, data):"""异步发送数据:param data: 发送数据:param topic: 主题:return: None"""try:self.producer.send(topic, data)print("send data:{}".format(data))except Exception as e:raise Exception("Kafka asyn send failed, {}.".format(e))def async_callback(self, topic: str, data):"""异步发送数据 + 发送状态处理:param data:发送数据:param topic: 主题:return: None"""try:for item in data:self.producer.send(topic, item).add_callback(self.__send_success).add_errback(self.__send_error)self.producer.flush() # 批量提交except Exception as e:raise Exception("Kafka asyn send fail, {}.".format(e))@staticmethoddef __send_success():"""异步发送成功回调函数"""print("save success")return@staticmethoddef __send_error():"""异步发送错误回调函数"""print("save error")returndef close(self):self.producer.close()class KConsumer:def __init__(self, bootstrap_servers: List, topic: str, group_id: str, key_deserializer=None,value_deserializer=None, auto_offset_reset="latest"):self.topic = topictry:self.consumer = KafkaConsumer(self.topic,bootstrap_servers=bootstrap_servers,group_id=group_id,enable_auto_commit=False,auto_commit_interval_ms=1000,session_timeout_ms=30000,max_poll_records=50,max_poll_interval_ms=30000,metadata_max_age_ms=3000,key_deserializer=key_deserializer,value_deserializer=value_deserializer,auto_offset_reset=auto_offset_reset)self.consumer.subscribe(topics=[self.topic])print("connect to kafka and subscribe topic success")except Exception as e:raise Exception("Kafka pconsumers set connect fail, {0}, {1}".format(e, traceback.print_exc()))def get_consumer(self):"""返会可迭代consumer:return: consumer"""return self.consumerdef set_topic(self, topic: str):"""订阅主题:param topic: 主题:return: None"""self.topic = topicself.consumer.subscribe(topics=[self.topic])def get_message_by_partition_offset(self, partition, offset):"""通过partition、offset获取一个消息:param partition: 分区:param offset: 游标、下标、序号:return: message,消息"""self.consumer.unsubscribe()partition = TopicPartition(self.topic, partition)self.consumer.assign([partition])self.consumer.seek(partition, offset=offset)for message in self.consumer:return message
测试代码
kafka_test.py
from kafka_helper import KProducer,KConsumer
import jsondef sync_send_test(bootstrap_servers,topic,json_format=True):value = {"send_type": "sync_send","name": "lady_killer","age": 18}if json_format:p = KProducer(bootstrap_servers=bootstrap_servers)p.sync_send(value,topic)else:p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)v = bytes('{}'.format(json.dumps(value)), 'utf-8')p.sync_send(v,topic)p.close()def async_send_test(bootstrap_servers,topic,json_format=True):value = {"send_type": "async_send","name":"lady_killer","age":18}if json_format:p = KProducer(bootstrap_servers=bootstrap_servers)p.asyn_send(value,topic)else:p = KProducer(bootstrap_servers=bootstrap_servers,key_serializer=None,value_serializer=None)v = bytes('{}'.format(json.dumps(value)), 'utf-8')p.asyn_send(v,topic)p.close()def consumer_test(bootstrap_servers,topic):c = KConsumer(bootstrap_servers=bootstrap_servers,topic=topic,group_id='test',auto_offset_reset="earliest")for data in c.get_consumer():print(type(data.value),data.value)print(json.loads(data.value))def get_one_msg(bootstrap_servers,topic,partition,offset):c = KConsumer(bootstrap_servers=bootstrap_servers, topic=topic, group_id='test', auto_offset_reset="earliest")msg = c.get_message_by_partition_offset(partition,offset)print(msg)if __name__ == '__main__':bootstrap_servers = ["kafka:9092"]topic = "demodata"# 测试生产sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic)async_send_test(bootstrap_servers=bootstrap_servers,topic=topic)sync_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)async_send_test(bootstrap_servers=bootstrap_servers,topic=topic,json_format=False)# 测试消费consumer_test(bootstrap_servers=bootstrap_servers,topic=topic)# get_one_msg(bootstrap_servers=bootstrap_servers,topic=topic,partition=0,offset=0)
参考
Kafka入门,这一篇就够了(安装,topic,生产者,消费者)