RabbitMQ 入门教程

news/2024/9/18 20:51:21/ 标签: ruby, 开发语言, 后端

RabbitMQ 入门教程

1. 引言

RabbitMQ 是一个开源的消息代理和队列服务器,实现高级消息队列协议 (AMQP)。它能帮助开发者实现应用程序间的解耦、异步处理、流量削峰等需求。

2. 安装与配置

2.1 安装RabbitMQ

2.1.1 Ubuntu

```bash

sudo apt-get update

sudo apt-get install rabbitmq-server

```

2.1.2 Windows

1. 下载安装包: [RabbitMQ Download](https://www.rabbitmq.com/download.html)

2. 运行安装向导。

2.2 启动服务

```bash

sudo service rabbitmq-server start

```

2.3 配置管理插件

```bash

sudo rabbitmq-plugins enable rabbitmq_management

```

2.4 访问管理界面

- 浏览器访问: `http://localhost:15672/`

- 默认用户名/密码: `guest/guest`

3. Python 开发环境准备

3.1 安装pika库

```bash

pip install pika

```

4. Hello World 示例

4.1 发送端 - send.py

```python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',

routing_key='hello',

body='Hello World!')

print(" [x] Sent 'Hello World!'")

connection.close()

```

4.2 接收端 - receive.py

```python

import pika

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_consume(queue='hello',

on_message_callback=callback,

auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

```

5. 工作队列

5.1 创建工作队列

```python

import pika

import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = "A message"

channel.basic_publish(exchange='',

routing_key='task_queue',

body=message,

properties=pika.BasicProperties(

delivery_mode=2, # make message persistent

))

print(" [x] Sent %r" % message)

connection.close()

```

5.2 消费者

```python

import pika

import time

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

time.sleep(body.count(b'.'))

print(" [x] Done")

ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

channel.basic_qos(prefetch_count=1)

channel.basic_consume(queue='task_queue',

on_message_callback=callback)

channel.start_consuming()

```

6. 发布/订阅模式

6.1 发布者

```python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = "Log message"

channel.basic_publish(exchange='logs',

routing_key='',

body=message)

print(" [x] Sent %r" % message)

connection.close()

```

6.2 订阅者

```python

import pika

import sys

def callback(ch, method, properties, body):

print(" [x] %r" % body)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)

queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

channel.basic_consume(queue=queue_name,

on_message_callback=callback,

auto_ack=True)

channel.start_consuming()

```

7. 路由模式

7.1 发布者

```python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = "info"

message = "Info message"

channel.basic_publish(exchange='direct_logs',

routing_key=severity,

body=message)

print(" [x] Sent %r:%r" % (severity, message))

connection.close()

```

7.2 订阅者

```python

import pika

def callback(ch, method, properties, body):

print(" [x] %r:%r" % (method.routing_key, body))

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severities = ["info", "warning"]

result = channel.queue_declare(queue='', exclusive=True)

queue_name = result.method.queue

for severity in severities:

channel.queue_bind(exchange='direct_logs',

queue=queue_name,

routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

channel.basic_consume(queue=queue_name,

on_message_callback=callback,

auto_ack=True)

channel.start_consuming()

```

8. 主题模式

8.1 发布者

```python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = "kern.critical"

message = "Kernel critical message"

channel.basic_publish(exchange='topic_logs',

routing_key=routing_key,

body=message)

print(" [x] Sent %r:%r" % (routing_key, message))

connection.close()

```

8.2 订阅者

```python

import pika

def callback(ch, method, properties, body):

print(" [x] %r:%r" % (method.routing_key, body))

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare(queue='', exclusive=True)

queue_name = result.method.queue

binding_keys = ["kern.*", "*.critical"]

for binding_key in binding_keys:

channel.queue_bind(exchange='topic_logs',

queue=queue_name,

routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

channel.basic_consume(queue=queue_name,

on_message_callback=callback,

auto_ack=True)

channel.start_consuming()

```


http://www.ppmy.cn/news/1518740.html

相关文章

动态IP池在数据抓取中的应用与优势

随着互联网技术的快速发展,数据抓取(Web Scraping)已经成为获取互联网信息的重要手段。然而,在进行大规模数据抓取时,往往会遇到反爬虫机制、IP封禁等问题。动态IP池作为一种解决方案,可以有效地绕过这些障…

告别手动记录,音频转文字软件助力会议记录新高度

如果你突然被领导指派去参与一场会议,身边没有纸笔要怎么记录转达会议内容呢?我往往会采用手机的录音功能来记录会议内容会后再进行整理。这次我们就来探索音频转文字工具怎么提升我们的工作效率。 1.365在线转文字 链接传送:https://www.p…

微服务优缺点以及如何拆分

微服务优点 1,降低代码逻辑复杂度。 单个微服务模块相当于一个项目,开发人员只用关心这个模块的逻辑即可。 2,技术栈更加灵活 不同的微服务可以使用合适的语言架构实现,然后把服务注册到一个注册中心即可相互调用。 3,按需伸缩 当…

人工智能工作级开发者认证 HCCDP – AI 真题2 答案

1.GBDT通过bagging的防范可以对样本和特征都进行采集。答案:FALSE 原因:GBDT可以对样本采集,不能对特征采集 2.深度学习是机器学习的一个分支。答案:true 3.softmax激活函数的作用是减少及时量和防止梯度消失。答案false 4.在建筑施工现场,基于定制化的图像识别目标检测系统,…

Node.js 安装与使用及连接 MongoDB 的详细教程

下面我将详细讲解如何安装 Node.js、介绍 Node.js 的脚手架工具、使用 Express 脚手架创建项目,以及如何安装和连接 MongoDB。 一、Node.js 安装 下载 Node.js: 访问 Node.js 官方网站。 根据你的操作系统选择最新的 LTS(长期支持版&#x…

从自动驾驶看无人驾驶叉车的技术落地和应用

摘 要 | 介绍无人驾驶叉车在自动驾驶技术中的应用,分析其关键技术,如环境感知、定位、路径规划等,并讨论机器学习算法和强化学习算法的应用以提高无人叉车的运行效率和准确性。无人叉车在封闭结构化环境、机器学习、有效数据集等方…

参加 帆软 BI 上海城市 课堂(08-30培训)

参加 帆软 BI 城市 课堂(0830): 由于目前是自由职业,也想学习一下新的知识 。所以参加本次的培训,总的来说还是比较专业。 培训在 上海 帆软的总部 环球港进行。时间是 13:30~17:00 老师很专业。学习中 课…

关于前端布局的基础知识

float 横向布局 float 实现横向布局,需要向横着布局的元素添加float 其值left right 存在问题 如果使用float 所在父级五高度,会导致下方的元素上移 top的高度被吞了 解决方法: 给父级元素设置高度:不推荐,需要给父级…

LeetCode第65题 有效数字 结合设计模式:状态模式

思路:有限状态机,结合Java的设计模式:状态模式。 单纯用状态机会有大量的if-else,非常不好看,思路不清晰 用设计模式则非常清楚。但是设计模式是为了给人理解的,对机器而言也可能稍微影响性能; …

IO练习--随机点名

随机点名器1 需求: 有一个文件里面存储了班级同学的信息,每一个信息占一行。 格式为:张三-男-23 要求通过程序实现随机点名器。 运行效果: 第一次运行程序:随机同学姓名1(只显示名字) 第二次运行程序:随机同学姓名2(只显示名字) 第三次运行程序:随机同学姓名3(只显…

【精选】基于Hadoop的用户网站浏览分析的设计与实现(全网最新定制,独一无二)

博主介绍: ✌我是阿龙,一名专注于Java技术领域的程序员,全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师,我在计算机毕业设计开发方面积累了丰富的经验。同时,我也是掘金、华为云、阿里云、InfoQ等平台…

2.10鼠标事件

目录 实验原理 实验代码 运行结果 文章参考 实验原理 在 OpenCV 中存在鼠标的操作,比如左键单击、双击等。对于 OpenCV 来讲,用户的鼠标操作被认为发生了一个鼠标事件,需要对这个鼠标事件进行处理,这就是事件的响应。下面我们…

软件设计原则之接口隔离原则

接口隔离原则(Interface Segregation Principle, ISP)是面向对象设计中的一个重要原则,它属于SOLID原则之一。这个原则强调客户端(即接口的调用者)不应该被迫依赖于它们不使用的方法。换句话说,一个类对另一…

centos安装docker并配置加速器

docker安装与卸载: 1、检查当前是否安装docker yum list installed | grep docker2、卸载docker 根据yum list installed | grep docker查询出来的内容,逐个进行删除 yum remove docker.x86 64 -y3、启动与关闭docker 4、删除/etc/docker文件夹 如果…

理解HTTP请求方法:GET、POST、PUT 等

在现代Web开发中,理解不同HTTP请求方法的用途及其特点是至关重要的。每种请求方法都承担着特定的角色,在客户端和服务器之间的通信中发挥着关键作用。包括GET、POST、PUT,以及一些不太常用的方法,如HEAD、DELETE、OPTIONS、TRACE和…

MyBatis之XML配置文件(二)

六、动态SQL拼接 &#xff2d;yBatis提供了if 、foreach、choose等标签动态拼接sql语句&#xff0c;下面介绍这些标签的使用 1、if标签 if标签通常用于WHERE语句中&#xff0c;通过判断参数值决定是否使用某个查询条件。 <select id"selectStudentListLikeName"…

VUE-组件间通信(三)全局事件总线

一、作用&#xff1a;任意组件间通信 二、实现 1、创建全局事件总线 new Vue({render: h > h(App),beforeCreate(){//创建全局事件总线Vue.prototype.$busthis} }).$mount(#app) 2、学生组件 触发事件 <template><div class"studentInfo"><h…

【openwrt-21.02】T750 openwrt-21.02 pptp拨号失败问题分析及解决方案

Openwrt版本 NAME="OpenWrt" VERSION="21.02-SNAPSHOT" ID="openwrt" ID_LIKE="lede openwrt" PRETTY_NAME="OpenWrt 21.02-SNAPSHOT" VERSION_ID="21.02-snapshot" HOME_URL="https://openwrt.org/" …

Python知识点:如何使用SQLAlchemy进行ORM(对象关系映射)

使用SQLAlchemy进行ORM&#xff08;对象关系映射&#xff09;涉及几个关键步骤。以下是一个详细的指南&#xff0c;帮助你理解并使用SQLAlchemy进行ORM。 1. 安装SQLAlchemy 首先&#xff0c;你需要安装SQLAlchemy。你可以使用pip来安装&#xff1a; pip install sqlalchemy…

【python】数据分析统计

逐行读取’\t’分割的txt 对其中的每个数值都转为六位小数的str 再存入dict 存到excel pip install pandas pip install openpyxl # 用于写入Excel文件import pandas as pd # 假设txt文件的路径是data.txt file_path data.txt # 用于存储数据的字典&#xff0c;假设每…