Python发送带key的kafka消息

embedded/2024/12/24 3:44:58/

在Python中发送带有键(key)的Kafka消息,通常会使用`confluent-kafka`或`kafka-python`这样的库。这里我将分别展示如何使用这两个库来实现这个功能。

 

### 使用 `confluent-kafka`

 

首先,确保你已经安装了`confluent-kafka`库。如果没有安装,可以使用pip进行安装:

```bash

pip install confluent-kafka

```

 

然后,你可以使用以下代码来发送带有键的消息:

```python

from confluent_kafka import Producer

 

def delivery_report(err, msg):

    """ Called once for each message produced to indicate delivery result.

        Triggered by poll() or flush(). """

    if err is not None:

        print(f'Message delivery failed: {err}')

    else:

        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

 

# 配置生产者

conf = {'bootstrap.servers': 'localhost:9092'}

 

# 创建生产者实例

producer = Producer(conf)

 

# 消息的键和值

message_key = 'my_key'

message_value = 'Hello, Kafka!'

 

# 发送消息

producer.produce('my_topic', key=message_key, value=message_value, callback=delivery_report)

 

# 触发所有消息的回调函数

producer.flush()

```

 

### 使用 `kafka-python`

 

同样地,确保你已经安装了`kafka-python`库。如果未安装,可以通过pip安装:

```bash

pip install kafka-python

```

 

接下来,使用以下代码来发送带有键的消息:

```python

from kafka import KafkaProducer

 

# 创建生产者实例

producer = KafkaProducer(bootstrap_servers='localhost:9092',

                         key_serializer=str.encode,

                         value_serializer=str.encode)

 

# 消息的键和值

message_key = 'my_key'

message_value = 'Hello, Kafka!'

 

# 发送消息

producer.send('my_topic', key=message_key, value=message_value)

 

# 确保所有消息都已发送

producer.flush()

 

# 关闭生产者

producer.close()

```

 

在这两个例子中,我们创建了一个Kafka生产者,并指定了一个本地运行的Kafka服务器地址(`localhost:9092`)。然后,我们定义了要发送的消息的键和值,并调用了相应的方法来发送消息。对于`confluent-kafka`,我们还设置了一个回调函数来处理消息的交付结果。

 

请根据你的实际环境调整配置,例如Kafka服务器的地址等。希望这能帮助到你!如果有任何其他问题,请随时提问。


http://www.ppmy.cn/embedded/148238.html

相关文章

陪诊小程序搭建,打造一站式陪诊服务

当下,陪诊市场正在持续火热发展,在全国医疗行业中,陪诊师成为了一个重要的就医方式。陪诊师的出现在快节奏生活下显得尤为重要,为不少没有时间陪老人去医院的家庭以及对医院不熟悉的提供了便利,满足了众多患者及其家属…

梳理你的思路(从OOP到架构设计)_介绍GoF设计模式

目录 GoF的由来 GoF的种类 GoF的由来 裁缝有样式、围棋有棋谱、烹饪有食谱、武功有招式、战争有兵法, ..... 皆是专家和高手的经验心得,通称为:模式(Pattern)。模式告诉您理想的方案像什么、有那些特性﹔ 同时也告诉您些规则,让…

python实现基于RPC协议的接口自动化测试

01 什么是RPC RPC(Remote Procedure Call)远程过程调用协议是一个用于建立适当框架的协议。从本质上讲,它使一台机器上的程序能够调用另一台机器上的子程序,而不会意识到它是远程的。 RPC 是一种软件通信协议,一个程…

ubuntu22.04 nginx配置下载目录,亲测成功

安装nginx ubuntu最简单,apt安装即可 apt install nginx 配置文件 文件都在目录下 /etc/nginx/添加内容 修改/ etc/nginx/sites-available/default , 注意这里不是nginx.conf,直接修改nginx.conf不奏效 location /downloads { …

Docker快速安装Tomcat

安装docker的教程,参考文章: Linux安装Docker-CSDN博客 在linux中安装Tomcat,步骤如下: 1.从远程仓库中拉取Tomcat镜像 docker pull tomcat 如果拉取很慢,通过更换下载镜像的地址便可解决,不过镜像地址可能…

【集合】Java 8 - Stream API 17种常用操作与案例详解

文章目录 Java8 Stream API 17种常用操作与案例详解1. collect():将流中的元素收集到集合中2. filter():根据条件过滤流中的元素3. map():元素映射为另一个值4. forEach():对流中的元素执行操作5. flatMap():将流中的元…

websocket的心跳检测和断线重连

心跳检测和断线重连可以通过WebSocket的事件和属性来实现。以下是一个简单的JavaScript示例,使用WebSocket API实现心跳检测和断线重连的功能: let ws;function connectWebSocket() {ws new WebSocket(ws://your-websocket-server-url);ws.onopen fun…

智能座舱进阶-应用框架层-Handler分析

首先明确, handler是为了解决单进程内的线程之间的通信问题的。我也需要理解Android系统中进程和线程的概念, APP启动后,会有三四个线程启动起来,其中,有一条mainUITread的线程,专门用来处理UI事件&#xf…