Sentinel
-
测试软件
- jmeter
-
雪崩问题
-
超时处理
- 设定超时时间,请求超过一定时间没有响应就返回错误信息
-
仓壁模式
- 限定每个业务能使用的线程数,避免耗尽整个tomcat的资源,也叫线程隔离
-
断路器
- 由断路器统计业务执行的异常比例,如果超出阈值则会熔断该业务,拦截访问该业务请求
-
限流
- 流量控制,限制业务访问的QPS,避免服务因流量的突增而故障
- Sentinel
-
Sentinel
- 运行
- java -jar sentinel-dashboard-1.8.1.jar
- 配置
- server.port
- sentinel.dashboard.auth.username
- 默认用户名
- sentinel.dashboard.auth.password
- 默认密码
- 运行
java -Dserver.port=8090 -jar sentinel-dashboard-1.8.1.jar
微服务整合
- 依赖
<!--sentinel-->
<dependency><groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
- 配置
server:port: 8088
spring:cloud: sentinel:transport:dashboard: localhost:8080
限流
- 流控模式
- 直接:对当前资源限流
- 关联:高优先级资源触发阈值,对低优先级资源限流。
- 链路:阈值统计时,只统计从指定资源进入当前资源的请求,是对请求来源的限流
- 流控效果
- 快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException异常
- 默认的处理方式
- warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常
- 这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值
- 排队等待:让所有的请求按照先后次序排队执行
- 两个请求的间隔不能小于指定时长
- 快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException异常
隔离和降级
- 线程隔离
- 调用者在调用服务提供者时,给每个调用的请求分配独立线程池
- 出现故障时,最多消耗这个线程池内资源,避免把调用者的所有资源耗尽
- 熔断降级
- 是在调用方这边加入断路器,统计对服务提供者的调用
- 如果调用的失败比例过高,则熔断该业务,不允许访问该服务的提供者
FeignClient整合Sentinel
- 配置
feign:sentinel:enabled: true # 开启feign对sentinel的支持
- 实现FallbackFactory
java">@Slf4j
public class UserClientFallbackFactory implements FallbackFactory<UserClient> {@Overridepublic UserClient create(Throwable throwable) {return new UserClient() {@Overridepublic User findById(Long id) {log.error("查询用户异常", throwable);return new User();}};}
}
- FallbackFactory注册Bean:
java">@Bean
public UserClientFallbackFactory userClientFallbackFactory(){return new UserClientFallbackFactory();
}
- UserClient接口中使用FallbackFactory
java">@FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class)
public interface UserClient {@GetMapping("/user/{id}")User findById(@PathVariable("id") Long id);
}
线程隔离
- 线程池隔离
- 给每个服务调用业务分配一个线程池
- 利用线程池本身实现隔离效果
- 信号量隔离
- 计数器模式,记录业务使用的线程数量
- 达到信号量上限时,禁止新的请求
熔断降级
- 断路器控制熔断和放行是通过状态机来完成
- 状态机
- closed:关闭状态
- 断路器放行所有请求,并开始统计异常比例、慢请求比例
- 超过阈值则切换到open状态
- open:打开状态
- 服务调用被熔断,访问被熔断服务的请求会被拒绝,快速失败,直接走降级逻辑
- Open状态5秒后会进入half-open状态
- half-open:半开状态
- 放行一次请求,根据执行结果来判断接下来的操作。
- 请求成功:则切换到closed状态
- 请求失败:则切换到open状态
- closed:关闭状态
- 器熔断策略
- 慢调用
- 异常比例
- 异常数
授权规则
-
白名单:来源(origin)在白名单内的调用者允许访问
-
黑名单:来源(origin)在黑名单内的调用者不允许访问
-
定义RequestOriginParser接口,返回不同的origin
java">@Component
public class HeaderOriginParser implements RequestOriginParser {@Overridepublic String parseOrigin(HttpServletRequest request) {// 1.获取请求头String origin = request.getHeader("origin");// 2.非空判断if (StringUtils.isEmpty(origin)) {origin = "blank";}return origin;}
}
- 网关的请求添加请求头
spring:cloud:gateway:default-filters:- AddRequestHeader=origin,gateway
- 实现BlockExceptionHandler接口,自定义异常时的返回结果
- 处理请求被限流、降级、授权拦截时抛出BlockException
java">@Component
public class SentinelExceptionHandler implements BlockExceptionHandler {@Overridepublic void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {String msg = "未知异常";int status = 429;if (e instanceof FlowException) {msg = "请求被限流了";} else if (e instanceof ParamFlowException) {msg = "请求被热点参数限流";} else if (e instanceof DegradeException) {msg = "请求被降级了";} else if (e instanceof AuthorityException) {msg = "没有权限访问";status = 401;}response.setContentType("application/json;charset=utf-8");response.setStatus(status);response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}");}
}
规则持久化
- 规则管理模式
- 原始模式
- Sentinel的默认模式,将规则保存在内存,重启服务会丢失
- pull模式
- 控制台将配置的规则推送到Sentinel客户端
- 客户端会将配置规则保存在本地文件或数据库中
- 定时去本地文件或数据库中查询,更新本地规则
- push模式
- 控制台将配置规则推送到远程配置中心Nacos
- Sentinel客户端监听Nacos,获取配置变更的推送消息,完成本地配置更新
- 原始模式
实现push模式
- 依赖
<dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
- 配置
spring:cloud:sentinel:datasource:flow:nacos:server-addr: localhost:8848 # nacos地址dataId: orderservice-flow-rulesgroupId: SENTINEL_GROUPrule-type: flow # 还可以是:degrade、authority、param-flow
- 修改sentinel-dashboard源码
- 重新启动
分布式事务
-
CAP
- Consistency(一致性)
- Availability(可用性)
- Partition tolerance (分区容错性)
-
BASE
- Basically Available (基本可用)
- Soft State(软状态)
- Eventually Consistent(最终一致性)
-
分布式事务
- AP模式
- 各子事务分别执行和提交,允许出现结果不一致
- 采用弥补措施恢复数据即可,实现最终一致
- CP模式
- 各个子事务执行后互相等待,同时提交,同时回滚,达成强一致
- 事务等待过程中,处于弱可用状态
- AP模式
Seata
- Seata的架构
- TC (Transaction Coordinator)
- 事务协调者
- TM (Transaction Manager)
- 事务管理器
- RM (Resource Manager)
- 资源管理器
- TC (Transaction Coordinator)
- 下载
- http://seata.io/zh-cn/blog/download.html
微服务集成
- 依赖
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><exclusions><!--版本较低,1.3.0,因此排除--><exclusion><artifactId>seata-spring-boot-starter</artifactId><groupId>io.seata</groupId></exclusion></exclusions>
</dependency>
<!--seata starter 采用1.4.2版本-->
<dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><!--<version>1.4.2</version>--><version>${seata.version}</version>
</dependency>
- 配置
seata:registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址# 参考tc服务自己的registry.conf中的配置type: nacosnacos: # tcserver-addr: 127.0.0.1:8848namespace: ""group: DEFAULT_GROUPapplication: seata-tc-server # tc服务在nacos中的服务名称cluster: SHtx-service-group: seata-demo # 事务组,根据这个获取tc服务的cluster名称service:vgroup-mapping: # 事务组与TC服务cluster的映射关系seata-demo: SH
XA模式
-
RM一阶段的工作:
- 注册分支事务到TC
- 执行分支业务sql但不提交
- 报告执行状态到TC
-
TC二阶段的工作:
- TC检测各分支事务执行状态
- 如果都成功,通知所有RM提交事务
- 如果有失败,通知所有RM回滚事务
- TC检测各分支事务执行状态
-
RM二阶段的工作:
- 接收TC指令,提交或回滚事务
-
配置文件
seata: dtat-source-proxy-mode: XA
- 添加注解
- 全局事务的入口方法添加@GlobalTransactional注解
AT模式
-
阶段一RM的工作:
- 注册分支事务
- 记录undo-log(数据快照)
- 执行业务sql并提交
- 报告事务状态
-
阶段二提交时RM的工作:
- 删除undo-log即可
- 阶段二回滚时RM的工作:
- 根据undo-log恢复数据到更新前
-
配置文件
seata: dtat-source-proxy-mode: AT
- 添加注解
- 全局事务的入口方法添加@GlobalTransactional注解
TCC模式
- 需要实现三个方法
- Try:资源的检测和预留;
- Confirm:完成资源操作业务;要求 Try 成功 Confirm 一定要能成功。
- Cancel:预留资源释放,可以理解为try的反向操作。
java">@LocalTCC
public interface AccountTCCService {@TwoPhaseBusinessAction(name = "deduct", commitMethod = "confirm", rollbackMethod = "cancel")void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,@BusinessActionContextParameter(paramName = "money")int money);boolean confirm(BusinessActionContext ctx);boolean cancel(BusinessActionContext ctx);
}
SAGA模式
- 一阶段
- 直接提交本地事务
- 二阶段
- 成功则什么都不做;失败则通过编写补偿业务来回滚
Redis
Redis持久化
-
RDB持久化
- Redis Database Backup file
- Redis数据备份文件,也被叫做Redis数据快照。
- 把内存中的所有数据都记录到磁盘中
- 当Redis实例故障重启后,从磁盘读取快照文件,恢复数据
-
RDB执行时机
- Redis停机时会执行一次RDB
- Redis内部有触发RDB的机制
- 可以在redis.conf文件中找到
-
异步持久化bgsave
- fork主进程得到一个子进程,共享内存空间
- 子进程读取内存数据并写入新的RDB文件
- 用新RDB文件替换旧的RDB文件
-
AOF持久化
-
Append Only File(追加文件)
-
Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件
-
频率
- always
- ererysec
- no
Redis主从
-
搭建主从架构
- 搭建主从集群,实现读写分离
-
主从数据同步原理
- 全量同步
- 第一次同步
- 增量同步
- slave重启后同步
- 全量同步
Redis哨兵
-
Sentinel
- 监控
- 心跳
- 自动故障恢复
- 选举新master
- 通知
- 通知客户端新的master
- 监控
-
心跳检测
- 主观下线
- 客观下线
RedisTemplate哨兵模式
- 依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring:redis:password: 1234 #如果Redis有密码市一定要配置密码sentinel:master: mymaster #指定master名称nodes: # 指定Redis集群信息- ip地址:27001- ip地址:27002- ip地址:27003
- RedisTemplate配置主从读写分离
java">@Bean
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){// REPLICA_PREFERRED 优先从slave节点读取 return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}
Redis分片集群(增强存储能力)
- 分片集群
- 每个集群多个master,每个master保存不同数据
- 每个master多个slave
- master直接通过ping监测彼此
- 客户端访问集群任意节点,最终都会被转发到正确的节点
- 散列插槽
- redis会把master节点映射到0~1638插槽上
- 数据的key与插槽绑定
集群伸缩
- 新增
- redis-cli -a 密码 --cluster add-node ip地址:port ip地址:7001
- 分配插槽
- redis-cli -a 密码 --cluster reshard ip地址:port
- 删除
- redis-cli -a 密码 --cluster del-node ip地址:port 节点id
故障转移
-
自动故障转移
- 自动
-
手动故障转移
- cluster failover命令
RedisTemplate访问分片集群
- 引入redis依赖
- 配置分片集群地址
- 配置读写分离
spring:redis:cluster:nodes:- ip:port- ip:port- ip:port- ip:port- ip:port
多级缓存
- 多级缓存
- 浏览器&客户端
- nginx本地缓存
- redis缓存
- JVM进程缓存
- 数据库
JVM进程缓存
- 分布式缓存
- redis
- 进程本地缓存
- HashMap
- GuavaCache
- Caffeine
- Spring内部的缓存使用的就是Caffeine
Caffeine
java">// 构建cache
Cache<String,String> cache = Caffeine.newBuilder().build();
// 存储数据
cache.put("gf","xxx");
// 取值
String gf = cache.getIfPresent("gf");String defaultGF = cache.get("defaultGF",key->{return "xxx"
});
-
缓存驱逐策略
- 基于容器
- 数量上限
- 基于时间
- 设置有效时间
- 基于引用
- 不建议使用
- 基于容器
-
基于容器
java">Cache<String,String> cache = Caffeine.newBuilder().maximumSize(1).build();
- 基于时间
java">Cache<String,String> cache = Caffeine.newBuilder().expireAfterWrite(Duration.ofSeconds(10)).build();
Lua语法
helloworld
- 新建
touch hello.lua
- 添加内容
print("hello world")
- 运行
lua hello.lua
数据类型
- 数据类型
- nil
- 无效值
- boolean
- number
- string
- function
- table
- 数组key为索引
- local arr = {‘xx’,‘xx’}
- print(arr[1])
- map
- loacl map = {name=‘xx’,age=2}
- map[‘name’]
- map.name
- 数组key为索引
- nil
- 查看数据类型
- type()
- 循环遍历table
local arr = {'ar','xx',ss}
for index,value in ipairs(arr) doprint(index,value)
end
loacl map = {name='xx',age=2}
for index,value in ipairs(map) doprint(index,value)
end
- 函数
function printArr(arr)for index,value in ipairs(arr) doprint(value)end
end
- 条件控制
if(布尔表达式)
thenelseend
- 逻辑运算
- and
- or
- not
OpenResty
-
基于nginx的高性能web平台
-
安装
- OpenResty
- opm
-
目录
- /usr/local/openresty
-
获取请求参数
- 路径占位符
- /item/111
- local id=ngx.var[1]
- 请求头
- local headers = ngx.req.get_headers()
- Get
- local getParams = ngx.req.get_uri_args()
- post
- gx.req.read_body()
- local postParams = ngx.req.get_post_args()
- JSON
- gx.req.read_body()
- local jsonBody = ngx.req.get_post_data()
- 路径占位符
-
封装http请求
- /usr/lodal/openresty/lualib/common.lua
--封装函数,发送http请求,并解析响应
local function read http(path,params)local resp = ngx.location.capture(path,{method =ngx.HTTP GET,args =params,})if not resp then--记录错误信息,返回404ngx.log(ngx.ERR,"http not found,path:",path ,",args:",args)ngx.exit(404)endreturn resp.body
end
--将方法导出
Local _M = {read_http = read_http
}
return _M
--导入common函数库
local common=require('common')
local read_http = common.read_http
--导入cjson库
local cjson=require('cjson')-- 获取路径参数
local id =ngx.var[1]-- 查询商品信息
local itemJSON =read_http("/item/" .. id, nil)-- 查询库存信息
local stockJSON = read_http("/item/stock/"..id, nil)
-- JSON转化为lua的tablelocal item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock =stock.stock
item.sold = stock.sold
-- 把item序列化为ison 返回结果
ngx.say(cjson.encode(item))
nginx本地缓存
-- 导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache-- 封装查询函数
function read_data(key, expire, path, params)-- 查询本地缓存local val = item_cache:get(key)if not val thenngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)-- 查询redisval = read_redis("127.0.0.1", 6379, key)-- 判断查询结果if not val thenngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)-- redis查询失败,去查询httpval = read_http(path, params)endend-- 查询成功,把数据写入本地缓存item_cache:set(key, val, expire)-- 返回数据return val
end
- 设置缓存时间1800/60
-- 查询商品信息
local itemJSON = read_data("item:id:" .. id, 1800, "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/" .. id, nil)
Redis 缓存预热
-
冷启动
- 服务刚刚启动时,redis中并没有缓存
- 如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力
-
缓存预热
- 在实际开发中,可以利用大数据统计用户访问的热点数据,项目启动时将这些热点数据提前查询并保存redis中
-
依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
- 配置
spring:redis:host: 192.168.150.101
- 初始化类
java">
@Component
public class RedisHandler implements InitializingBean {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate IItemService itemService;@Autowiredprivate IItemStockService stockService;private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void afterPropertiesSet() throws Exception {// 初始化缓存// 1.查询商品信息List<Item> itemList = itemService.list();// 2.放入缓存for (Item item : itemList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(item);// 2.2.存入redisredisTemplate.opsForValue().set("item:id:" + item.getId(), json);}// 3.查询商品库存信息List<ItemStock> stockList = stockService.list();// 4.放入缓存for (ItemStock stock : stockList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(stock);// 2.2.存入redisredisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);}}
}
- openResty查询redis
- common.lua
-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒local pool_size = 100 --连接池大小local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)if not ok thenngx.log(ngx.ERR, "放入redis连接池失败: ", err)end
end-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)-- 获取一个连接local ok, err = red:connect(ip, port)if not ok thenngx.log(ngx.ERR, "连接redis失败 : ", err)return nilend-- 查询redislocal resp, err = red:get(key)-- 查询失败处理if not resp thenngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)end--得到的数据为空处理if resp == ngx.null thenresp = nilngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)endclose_redis(red)return resp
end-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)local resp = ngx.location.capture(path,{method = ngx.HTTP_GET,args = params,})if not resp then-- 记录错误信息,返回404ngx.log(ngx.ERR, "http查询失败, path: ", path , ", args: ", args)ngx.exit(404)endreturn resp.body
end
-- 将方法导出
local _M = { read_http = read_http,read_redis = read_redis
}
return _M
- 先查redis,没有再查服务
- item.lua
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')-- 封装查询函数
function read_data(key, path, params)-- 查询本地缓存local val = read_redis("127.0.0.1", 6379, key)-- 判断查询结果if not val thenngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)-- redis查询失败,去查询httpval = read_http(path, params)end-- 返回数据return val
end-- 获取路径参数
local id = ngx.var[1]-- 查询商品信息
local itemJSON = read_data("item:id:" .. id, "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))
缓存同步
- 同步策略
- 设置有效期:给缓存设置有效期,到期后自动删除,再次查询时更新
- 同步双写:在修改数据库的同时,直接修改缓存
- 异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
- MQ
- Canal
- 自己伪装成MySQL的一个slave节点,监听master的binary log变化
Canal
- 依赖
<dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version>
</dependency>
- 配置
canal:destination: heima # canal的集群名字,要与安装canal时设置的名称一致server: 192.168.150.101:11111 # canal服务地址
- domain
java">
@Data
@TableName("tb_item")
public class Item {@TableId(type = IdType.AUTO)@Idprivate Long id;//商品id@Column(name = "name")private String name;//商品名称private String title;//商品标题private Long price;//价格(分)private String image;//商品图片private String category;//分类名称private String brand;//品牌名称private String spec;//规格private Integer status;//商品状态 1-正常,2-下架private Date createTime;//创建时间private Date updateTime;//更新时间@TableField(exist = false)@Transientprivate Integer stock;@TableField(exist = false)@Transientprivate Integer sold;
- 监听器
java">
@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {@Autowiredprivate RedisHandler redisHandler;@Autowiredprivate Cache<Long, Item> itemCache;@Overridepublic void insert(Item item) {// 写数据到JVM进程缓存itemCache.put(item.getId(), item);// 写数据到redisredisHandler.saveItem(item);}@Overridepublic void update(Item before, Item after) {// 写数据到JVM进程缓存itemCache.put(after.getId(), after);// 写数据到redisredisHandler.saveItem(after);}@Overridepublic void delete(Item item) {// 删除数据到JVM进程缓存itemCache.invalidate(item.getId());// 删除数据到redisredisHandler.deleteItemById(item.getId());}
}
- RedisHandler
java">@Component
public class RedisHandler implements InitializingBean {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate IItemService itemService;@Autowiredprivate IItemStockService stockService;private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void afterPropertiesSet() throws Exception {// 初始化缓存// 1.查询商品信息List<Item> itemList = itemService.list();// 2.放入缓存for (Item item : itemList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(item);// 2.2.存入redisredisTemplate.opsForValue().set("item:id:" + item.getId(), json);}// 3.查询商品库存信息List<ItemStock> stockList = stockService.list();// 4.放入缓存for (ItemStock stock : stockList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(stock);// 2.2.存入redisredisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);}}public void saveItem(Item item) {try {String json = MAPPER.writeValueAsString(item);redisTemplate.opsForValue().set("item:id:" + item.getId(), json);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public void deleteItemById(Long id) {redisTemplate.delete("item:id:" + id);}
}
RabbitMQ高级特性
消息可靠性
-
丢失原因
- 发送时丢失
- 生产者发送的消息未送达exchange
- 消息到达exchange后未到达queue
- MQ宕机,queue将消息丢失
- consumer接收到消息后未消费就宕机
- 发送时丢失
-
解决方案
- 生产者确认机制
- mq持久化
- 消费者确认机制
- 失败重试机制
生产者消息确认
-
publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
-
publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因
-
配置
spring:rabbitmq:# simple:同步等待confirm结果,直到超时# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallbackpublisher-confirm-type: correlated# 开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallbackpublisher-returns: truetemplate:# 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息mandatory: true
- ReturnCallback
java">@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});}
}
- ConfirmCallback
java">// 1.消息体
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(result -> {if(result.isAck()){// 3.1.ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// 3.2.nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.发送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);// 休眠一会儿,等待ack回执
Thread.sleep(2000);
消息持久化
-
消息持久化机制
- 交换机持久化
- 队列持久化
- 消息持久化
-
交换机持久化
java">@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("simple.direct", true, false);
}
- 队列持久化
java">@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}
- 消息持久化
java">// 准备消息
Message message = MessageBuilder.withBody("hello, spring".getBytes(Standardcharsets.UTF_8))// 默认就是持久化的.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // durable 持久.build();
// 发送消息
rabbitTemplate.convertAndSend( "simple.queue", message);
消费者确认
- SpringAMQP三种确认模式:
- manual
- 手动ack,需要在业务代码结束后,调用api发送ack。
- auto
- 自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none
- 关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
- manual
spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack none/auto/manual
消费失败重试机制
- 本地重试
- 利用Spring的retry机制,在消费者出现异常时利用本地重试
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
- 失败策略
- MessageRecovery接口
- RejectAndDontRequeueRecoverer
- 重试耗尽后,直接reject,丢弃消息
- 默认就是这种方式
- ImmediateRequeueMessageRecoverer
- 重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer
- 重试耗尽后,将失败消息投递到指定的交换机
- RejectAndDontRequeueRecoverer
- MessageRecovery接口
java">@Configuration
public class ErrorMessageConfig {@Bean// 交换机public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Bean// 队列public Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Bean// RepublishMessageRecoverer,关联队列和交换机public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}
死信交换机
- 死信(dead letter)
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
- 死信交换机
- 配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中
java">// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化.deadLetterExchange("dl.direct") // 指定死信交换机.build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}
TTL
-
ttl
- 队列设置超时时间,配置x-message-ttl属性
-
死信交换机、死信队列
java">@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dl.ttl.queue", durable = "true"),exchange = @Exchange(name = "dl.ttl.direct"),key = "ttl"
))
public void listenDlQueue(String msg){log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}
- 声明一个队列,并且指定TTL
java">@Bean
public Queue ttlQueue(){return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化.ttl(10000) // 设置队列的超时时间,10秒.deadLetterExchange("dl.ttl.direct") // 指定死信交换机.build();
}
- 将ttl与交换机绑定
java">@Bean
public DirectExchange ttlExchange(){return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
- 发送消息时,设定TTL
java">@Test
public void testTTLMsg() {// 创建消息Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)).setExpiration("5000").build();// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 发送消息rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);log.debug("发送消息成功");
}
延迟队列
-
插件
- DelayExchange
-
流程
- 接收消息
- 判断消息是否具备x-delay属性
- 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
- 返回routing not found结果给消息发送者
- x-delay时间到期后,重新投递消息到指定队列
-
声明延迟交换机
java">@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg);
}
java">@Slf4j
@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}
- 发送延迟消息
java">// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}
});
惰性队列
-
消息堆积
- 当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限
- 之后发送的消息就会成为死信,可能会被丢弃
-
Lazy Queues
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
-
通过命令行将一个运行中的队列修改为惰性队列
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
java">@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}
集群
- 普通集群/叫标准集群(classic cluster)
- 会在集群的各个节点间共享部分数据
- 包括:交换机、队列元信息
- 不包含:队列中的消息
- 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
- 队列所在节点宕机,队列中的消息就会丢失
- 会在集群的各个节点间共享部分数据
- 镜像集群/主从模式
- 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份
- 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
- 一个队列的主节点可能是另一个队列的镜像节点
- 所有操作都是主节点完成,然后同步给镜像节点
- 主宕机后,镜像节点会替代成新的主
- 仲裁队列
- 与镜像队列一样,都是主从模式,支持主从数据同步
- 使用非常简单,没有复杂的配置
- 主从同步基于Raft协议,强一致
仲裁队列
- 创建仲裁队列
java">@Bean
public Queue quorumQueue() {return QueueBuilder.durable("quorum.queue") // 持久化.quorum() // 仲裁队列.build();
}
- 配置
spring:rabbitmq:addresses: ip:port, ip:port, ip:portusername: xxpassword: xxvirtual-host: /