学习笔记-微服务高级(黑马程序员)

server/2024/11/14 21:12:32/

Sentinel

  • 测试软件

    • jmeter
  • 雪崩问题

    • 微服务往往依赖于多个其它微服务,服务提供者I发生了故障,依赖于当前服务的其它服务随着时间的推移形成级联失败
  • 超时处理

    • 设定超时时间,请求超过一定时间没有响应就返回错误信息
  • 仓壁模式

    • 限定每个业务能使用的线程数,避免耗尽整个tomcat的资源,也叫线程隔离
  • 断路器

    • 由断路器统计业务执行的异常比例,如果超出阈值则会熔断该业务,拦截访问该业务请求
  • 限流

    • 流量控制,限制业务访问的QPS,避免服务因流量的突增而故障
    • Sentinel
  • Sentinel

    • 运行
      • java -jar sentinel-dashboard-1.8.1.jar
    • 配置
      • server.port
      • sentinel.dashboard.auth.username
        • 默认用户名
      • sentinel.dashboard.auth.password
        • 默认密码
java -Dserver.port=8090 -jar sentinel-dashboard-1.8.1.jar

微服务整合

  • 依赖
<!--sentinel-->
<dependency><groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
  • 配置
server:port: 8088
spring:cloud: sentinel:transport:dashboard: localhost:8080

限流

  • 流控模式
  • 直接:对当前资源限流
  • 关联:高优先级资源触发阈值,对低优先级资源限流。
  • 链路:阈值统计时,只统计从指定资源进入当前资源的请求,是对请求来源的限流
  • 流控效果
    • 快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException异常
      • 默认的处理方式
    • warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常
      • 这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值
    • 排队等待:让所有的请求按照先后次序排队执行
      • 两个请求的间隔不能小于指定时长

隔离和降级

  • 线程隔离
    • 调用者在调用服务提供者时,给每个调用的请求分配独立线程池
    • 出现故障时,最多消耗这个线程池内资源,避免把调用者的所有资源耗尽
  • 熔断降级
    • 是在调用方这边加入断路器,统计对服务提供者的调用
    • 如果调用的失败比例过高,则熔断该业务,不允许访问该服务的提供者

FeignClient整合Sentinel

  • 配置
feign:sentinel:enabled: true # 开启feign对sentinel的支持
  • 实现FallbackFactory
java">@Slf4j
public class UserClientFallbackFactory implements FallbackFactory<UserClient> {@Overridepublic UserClient create(Throwable throwable) {return new UserClient() {@Overridepublic User findById(Long id) {log.error("查询用户异常", throwable);return new User();}};}
}
  • FallbackFactory注册Bean:
java">@Bean
public UserClientFallbackFactory userClientFallbackFactory(){return new UserClientFallbackFactory();
}
  • UserClient接口中使用FallbackFactory
java">@FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class)
public interface UserClient {@GetMapping("/user/{id}")User findById(@PathVariable("id") Long id);
}

线程隔离

  • 线程池隔离
    • 给每个服务调用业务分配一个线程池
    • 利用线程池本身实现隔离效果
  • 信号量隔离
    • 计数器模式,记录业务使用的线程数量
    • 达到信号量上限时,禁止新的请求

熔断降级

  • 断路器控制熔断和放行是通过状态机来完成
  • 状态机
    • closed:关闭状态
      • 断路器放行所有请求,并开始统计异常比例、慢请求比例
      • 超过阈值则切换到open状态
    • open:打开状态
      • 服务调用被熔断,访问被熔断服务的请求会被拒绝,快速失败,直接走降级逻辑
      • Open状态5秒后会进入half-open状态
    • half-open:半开状态
      • 放行一次请求,根据执行结果来判断接下来的操作。
      • 请求成功:则切换到closed状态
      • 请求失败:则切换到open状态
  • 器熔断策略
    • 慢调用
    • 异常比例
    • 异常数

授权规则

  • 白名单:来源(origin)在白名单内的调用者允许访问

  • 黑名单:来源(origin)在黑名单内的调用者不允许访问

  • 定义RequestOriginParser接口,返回不同的origin

java">@Component
public class HeaderOriginParser implements RequestOriginParser {@Overridepublic String parseOrigin(HttpServletRequest request) {// 1.获取请求头String origin = request.getHeader("origin");// 2.非空判断if (StringUtils.isEmpty(origin)) {origin = "blank";}return origin;}
}
  • 网关的请求添加请求头
spring:cloud:gateway:default-filters:- AddRequestHeader=origin,gateway
  • 实现BlockExceptionHandler接口,自定义异常时的返回结果
    • 处理请求被限流、降级、授权拦截时抛出BlockException
java">@Component
public class SentinelExceptionHandler implements BlockExceptionHandler {@Overridepublic void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {String msg = "未知异常";int status = 429;if (e instanceof FlowException) {msg = "请求被限流了";} else if (e instanceof ParamFlowException) {msg = "请求被热点参数限流";} else if (e instanceof DegradeException) {msg = "请求被降级了";} else if (e instanceof AuthorityException) {msg = "没有权限访问";status = 401;}response.setContentType("application/json;charset=utf-8");response.setStatus(status);response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}");}
}

规则持久化

  • 规则管理模式
    • 原始模式
      • Sentinel的默认模式,将规则保存在内存,重启服务会丢失
    • pull模式
      • 控制台将配置的规则推送到Sentinel客户端
      • 客户端会将配置规则保存在本地文件或数据库中
      • 定时去本地文件或数据库中查询,更新本地规则
    • push模式
      • 控制台将配置规则推送到远程配置中心Nacos
      • Sentinel客户端监听Nacos,获取配置变更的推送消息,完成本地配置更新

实现push模式

  • 依赖
<dependency><groupId>com.alibaba.csp</groupId><artifactId>sentinel-datasource-nacos</artifactId>
</dependency>
  • 配置
spring:cloud:sentinel:datasource:flow:nacos:server-addr: localhost:8848 # nacos地址dataId: orderservice-flow-rulesgroupId: SENTINEL_GROUPrule-type: flow # 还可以是:degrade、authority、param-flow
  • 修改sentinel-dashboard源码
    • 重新启动

分布式事务

  • CAP

    • Consistency(一致性)
    • Availability(可用性)
    • Partition tolerance (分区容错性)
  • BASE

    • Basically Available (基本可用)
    • Soft State(软状态)
    • Eventually Consistent(最终一致性)
  • 分布式事务

    • AP模式
      • 各子事务分别执行和提交,允许出现结果不一致
      • 采用弥补措施恢复数据即可,实现最终一致
    • CP模式
      • 各个子事务执行后互相等待,同时提交,同时回滚,达成强一致
      • 事务等待过程中,处于弱可用状态

Seata

  • Seata的架构
    • TC (Transaction Coordinator)
      • 事务协调者
    • TM (Transaction Manager)
      • 事务管理器
    • RM (Resource Manager)
      • 资源管理器
  • 下载
    • http://seata.io/zh-cn/blog/download.html

微服务集成

  • 依赖
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><exclusions><!--版本较低,1.3.0,因此排除--><exclusion><artifactId>seata-spring-boot-starter</artifactId><groupId>io.seata</groupId></exclusion></exclusions>
</dependency>
<!--seata starter 采用1.4.2版本-->
<dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><!--<version>1.4.2</version>--><version>${seata.version}</version>
</dependency>
  • 配置
seata:registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址# 参考tc服务自己的registry.conf中的配置type: nacosnacos: # tcserver-addr: 127.0.0.1:8848namespace: ""group: DEFAULT_GROUPapplication: seata-tc-server # tc服务在nacos中的服务名称cluster: SHtx-service-group: seata-demo # 事务组,根据这个获取tc服务的cluster名称service:vgroup-mapping: # 事务组与TC服务cluster的映射关系seata-demo: SH

XA模式

  • RM一阶段的工作:

    • 注册分支事务到TC
    • 执行分支业务sql但不提交
    • 报告执行状态到TC
  • TC二阶段的工作:

    • TC检测各分支事务执行状态
      • 如果都成功,通知所有RM提交事务
      • 如果有失败,通知所有RM回滚事务
  • RM二阶段的工作:

    • 接收TC指令,提交或回滚事务
  • 配置文件

seata: dtat-source-proxy-mode: XA
  • 添加注解
    • 全局事务的入口方法添加@GlobalTransactional注解

AT模式

  • 阶段一RM的工作:

    • 注册分支事务
    • 记录undo-log(数据快照)
    • 执行业务sql并提交
    • 报告事务状态
  • 阶段二提交时RM的工作:

    • 删除undo-log即可
    • 阶段二回滚时RM的工作:
    • 根据undo-log恢复数据到更新前
  • 配置文件

seata: dtat-source-proxy-mode: AT
  • 添加注解
    • 全局事务的入口方法添加@GlobalTransactional注解

TCC模式

  • 需要实现三个方法
    • Try:资源的检测和预留;
    • Confirm:完成资源操作业务;要求 Try 成功 Confirm 一定要能成功。
    • Cancel:预留资源释放,可以理解为try的反向操作。
java">@LocalTCC
public interface AccountTCCService {@TwoPhaseBusinessAction(name = "deduct", commitMethod = "confirm", rollbackMethod = "cancel")void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,@BusinessActionContextParameter(paramName = "money")int money);boolean confirm(BusinessActionContext ctx);boolean cancel(BusinessActionContext ctx);
}

SAGA模式

  • 一阶段
    • 直接提交本地事务
  • 二阶段
    • 成功则什么都不做;失败则通过编写补偿业务来回滚

Redis

Redis持久化

  • RDB持久化

    • Redis Database Backup file
    • Redis数据备份文件,也被叫做Redis数据快照。
      • 把内存中的所有数据都记录到磁盘中
      • 当Redis实例故障重启后,从磁盘读取快照文件,恢复数据
  • RDB执行时机

    • Redis停机时会执行一次RDB
    • Redis内部有触发RDB的机制
      • 可以在redis.conf文件中找到
  • 异步持久化bgsave

    • fork主进程得到一个子进程,共享内存空间
    • 子进程读取内存数据并写入新的RDB文件
    • 用新RDB文件替换旧的RDB文件
  • AOF持久化

  • Append Only File(追加文件)

  • Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件

  • 频率

    • always
    • ererysec
    • no

Redis主从

  • 搭建主从架构

    • 搭建主从集群,实现读写分离
  • 主从数据同步原理

    • 全量同步
      • 第一次同步
    • 增量同步
      • slave重启后同步

Redis哨兵

  • Sentinel

    • 监控
      • 心跳
    • 自动故障恢复
      • 选举新master
    • 通知
      • 通知客户端新的master
  • 心跳检测

    • 主观下线
    • 客观下线

RedisTemplate哨兵模式

  • 依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring:redis:password: 1234 #如果Redis有密码市一定要配置密码sentinel:master: mymaster #指定master名称nodes: # 指定Redis集群信息- ip地址:27001- ip地址:27002- ip地址:27003
  • RedisTemplate配置主从读写分离
java">@Bean
public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){// REPLICA_PREFERRED 优先从slave节点读取 return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}

Redis分片集群(增强存储能力)

  • 分片集群
    • 每个集群多个master,每个master保存不同数据
    • 每个master多个slave
    • master直接通过ping监测彼此
    • 客户端访问集群任意节点,最终都会被转发到正确的节点
  • 散列插槽
    • redis会把master节点映射到0~1638插槽上
    • 数据的key与插槽绑定

集群伸缩

  • 新增
    • redis-cli -a 密码 --cluster add-node ip地址:port ip地址:7001
  • 分配插槽
    • redis-cli -a 密码 --cluster reshard ip地址:port
  • 删除
    • redis-cli -a 密码 --cluster del-node ip地址:port 节点id

故障转移

  • 自动故障转移

    • 自动
  • 手动故障转移

    • cluster failover命令

RedisTemplate访问分片集群

  • 引入redis依赖
  • 配置分片集群地址
  • 配置读写分离
spring:redis:cluster:nodes:- ip:port- ip:port- ip:port- ip:port- ip:port

多级缓存

  • 多级缓存
    • 浏览器&客户端
    • nginx本地缓存
    • redis缓存
    • JVM进程缓存
    • 数据库

JVM进程缓存

  • 分布式缓存
    • redis
  • 进程本地缓存
    • HashMap
    • GuavaCache
    • Caffeine
      • Spring内部的缓存使用的就是Caffeine

Caffeine

java">// 构建cache
Cache<String,String> cache = Caffeine.newBuilder().build();
// 存储数据
cache.put("gf","xxx");
// 取值
String gf = cache.getIfPresent("gf");String defaultGF = cache.get("defaultGF",key->{return "xxx" 
});
  • 缓存驱逐策略

    • 基于容器
      • 数量上限
    • 基于时间
      • 设置有效时间
    • 基于引用
      • 不建议使用
  • 基于容器

java">Cache<String,String> cache = Caffeine.newBuilder().maximumSize(1).build();
  • 基于时间
java">Cache<String,String> cache = Caffeine.newBuilder().expireAfterWrite(Duration.ofSeconds(10)).build();

Lua语法

helloworld

  • 新建
touch hello.lua
  • 添加内容
print("hello world")
  • 运行
lua hello.lua

数据类型

  • 数据类型
    • nil
      • 无效值
    • boolean
    • number
    • string
    • function
    • table
      • 数组key为索引
        • local arr = {‘xx’,‘xx’}
        • print(arr[1])
      • map
        • loacl map = {name=‘xx’,age=2}
        • map[‘name’]
        • map.name
  • 查看数据类型
    • type()
  • 循环遍历table
local arr = {'ar','xx',ss}
for index,value in ipairs(arr) doprint(index,value)
end
loacl map = {name='xx',age=2}
for index,value in ipairs(map) doprint(index,value)
end
  • 函数
function printArr(arr)for index,value in ipairs(arr) doprint(value)end
end
  • 条件控制
if(布尔表达式)
thenelseend
  • 逻辑运算
    • and
    • or
    • not

OpenResty

  • 基于nginx的高性能web平台

  • 安装

    • OpenResty
    • opm
  • 目录

    • /usr/local/openresty
  • 获取请求参数

    • 路径占位符
      • /item/111
      • local id=ngx.var[1]
    • 请求头
      • local headers = ngx.req.get_headers()
    • Get
      • local getParams = ngx.req.get_uri_args()
    • post
      • gx.req.read_body()
      • local postParams = ngx.req.get_post_args()
    • JSON
      • gx.req.read_body()
      • local jsonBody = ngx.req.get_post_data()
  • 封装http请求

    • /usr/lodal/openresty/lualib/common.lua
--封装函数,发送http请求,并解析响应
local function read http(path,params)local resp = ngx.location.capture(path,{method =ngx.HTTP GET,args =params,})if not resp then--记录错误信息,返回404ngx.log(ngx.ERR,"http not found,path:",path ,",args:",args)ngx.exit(404)endreturn resp.body
end
--将方法导出
Local _M = {read_http = read_http
}
return _M
--导入common函数库
local common=require('common')
local read_http = common.read_http
--导入cjson库
local cjson=require('cjson')-- 获取路径参数
local id =ngx.var[1]-- 查询商品信息
local itemJSON =read_http("/item/" .. id, nil)-- 查询库存信息
local stockJSON = read_http("/item/stock/"..id, nil)
-- JSON转化为lua的tablelocal item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock =stock.stock
item.sold = stock.sold
-- 把item序列化为ison 返回结果
ngx.say(cjson.encode(item))

nginx本地缓存

-- 导入共享词典,本地缓存
local item_cache = ngx.shared.item_cache-- 封装查询函数
function read_data(key, expire, path, params)-- 查询本地缓存local val = item_cache:get(key)if not val thenngx.log(ngx.ERR, "本地缓存查询失败,尝试查询Redis, key: ", key)-- 查询redisval = read_redis("127.0.0.1", 6379, key)-- 判断查询结果if not val thenngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)-- redis查询失败,去查询httpval = read_http(path, params)endend-- 查询成功,把数据写入本地缓存item_cache:set(key, val, expire)-- 返回数据return val
end
  • 设置缓存时间1800/60
-- 查询商品信息
local itemJSON = read_data("item:id:" .. id, 1800,  "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, 60, "/item/stock/" .. id, nil)

Redis 缓存预热

  • 冷启动

    • 服务刚刚启动时,redis中并没有缓存
    • 如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力
  • 缓存预热

    • 在实际开发中,可以利用大数据统计用户访问的热点数据,项目启动时将这些热点数据提前查询并保存redis中
  • 依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
  • 配置
spring:redis:host: 192.168.150.101
  • 初始化类
java"> 
@Component
public class RedisHandler implements InitializingBean {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate IItemService itemService;@Autowiredprivate IItemStockService stockService;private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void afterPropertiesSet() throws Exception {// 初始化缓存// 1.查询商品信息List<Item> itemList = itemService.list();// 2.放入缓存for (Item item : itemList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(item);// 2.2.存入redisredisTemplate.opsForValue().set("item:id:" + item.getId(), json);}// 3.查询商品库存信息List<ItemStock> stockList = stockService.list();// 4.放入缓存for (ItemStock stock : stockList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(stock);// 2.2.存入redisredisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);}}
}
  • openResty查询redis
    • common.lua
-- 导入redis
local redis = require('resty.redis')
-- 初始化redis
local red = redis:new()
red:set_timeouts(1000, 1000, 1000)-- 关闭redis连接的工具方法,其实是放入连接池
local function close_redis(red)local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒local pool_size = 100 --连接池大小local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)if not ok thenngx.log(ngx.ERR, "放入redis连接池失败: ", err)end
end-- 查询redis的方法 ip和port是redis地址,key是查询的key
local function read_redis(ip, port, key)-- 获取一个连接local ok, err = red:connect(ip, port)if not ok thenngx.log(ngx.ERR, "连接redis失败 : ", err)return nilend-- 查询redislocal resp, err = red:get(key)-- 查询失败处理if not resp thenngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key)end--得到的数据为空处理if resp == ngx.null thenresp = nilngx.log(ngx.ERR, "查询Redis数据为空, key = ", key)endclose_redis(red)return resp
end-- 封装函数,发送http请求,并解析响应
local function read_http(path, params)local resp = ngx.location.capture(path,{method = ngx.HTTP_GET,args = params,})if not resp then-- 记录错误信息,返回404ngx.log(ngx.ERR, "http查询失败, path: ", path , ", args: ", args)ngx.exit(404)endreturn resp.body
end
-- 将方法导出
local _M = {  read_http = read_http,read_redis = read_redis
}  
return _M
  • 先查redis,没有再查服务
    • item.lua
-- 导入common函数库
local common = require('common')
local read_http = common.read_http
local read_redis = common.read_redis
-- 导入cjson库
local cjson = require('cjson')-- 封装查询函数
function read_data(key, path, params)-- 查询本地缓存local val = read_redis("127.0.0.1", 6379, key)-- 判断查询结果if not val thenngx.log(ngx.ERR, "redis查询失败,尝试查询http, key: ", key)-- redis查询失败,去查询httpval = read_http(path, params)end-- 返回数据return val
end-- 获取路径参数
local id = ngx.var[1]-- 查询商品信息
local itemJSON = read_data("item:id:" .. id,  "/item/" .. id, nil)
-- 查询库存信息
local stockJSON = read_data("item:stock:id:" .. id, "/item/stock/" .. id, nil)-- JSON转化为lua的table
local item = cjson.decode(itemJSON)
local stock = cjson.decode(stockJSON)
-- 组合数据
item.stock = stock.stock
item.sold = stock.sold-- 把item序列化为json 返回结果
ngx.say(cjson.encode(item))

缓存同步

  • 同步策略
    • 设置有效期:给缓存设置有效期,到期后自动删除,再次查询时更新
    • 同步双写:在修改数据库的同时,直接修改缓存
    • 异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
      • MQ
      • Canal
        • 自己伪装成MySQL的一个slave节点,监听master的binary log变化

Canal

  • 依赖
<dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version>
</dependency>
  • 配置
canal:destination: heima # canal的集群名字,要与安装canal时设置的名称一致server: 192.168.150.101:11111 # canal服务地址
  • domain
java">
@Data
@TableName("tb_item")
public class Item {@TableId(type = IdType.AUTO)@Idprivate Long id;//商品id@Column(name = "name")private String name;//商品名称private String title;//商品标题private Long price;//价格(分)private String image;//商品图片private String category;//分类名称private String brand;//品牌名称private String spec;//规格private Integer status;//商品状态 1-正常,2-下架private Date createTime;//创建时间private Date updateTime;//更新时间@TableField(exist = false)@Transientprivate Integer stock;@TableField(exist = false)@Transientprivate Integer sold;
  • 监听器
java">
@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {@Autowiredprivate RedisHandler redisHandler;@Autowiredprivate Cache<Long, Item> itemCache;@Overridepublic void insert(Item item) {// 写数据到JVM进程缓存itemCache.put(item.getId(), item);// 写数据到redisredisHandler.saveItem(item);}@Overridepublic void update(Item before, Item after) {// 写数据到JVM进程缓存itemCache.put(after.getId(), after);// 写数据到redisredisHandler.saveItem(after);}@Overridepublic void delete(Item item) {// 删除数据到JVM进程缓存itemCache.invalidate(item.getId());// 删除数据到redisredisHandler.deleteItemById(item.getId());}
}
  • RedisHandler
java">@Component
public class RedisHandler implements InitializingBean {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate IItemService itemService;@Autowiredprivate IItemStockService stockService;private static final ObjectMapper MAPPER = new ObjectMapper();@Overridepublic void afterPropertiesSet() throws Exception {// 初始化缓存// 1.查询商品信息List<Item> itemList = itemService.list();// 2.放入缓存for (Item item : itemList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(item);// 2.2.存入redisredisTemplate.opsForValue().set("item:id:" + item.getId(), json);}// 3.查询商品库存信息List<ItemStock> stockList = stockService.list();// 4.放入缓存for (ItemStock stock : stockList) {// 2.1.item序列化为JSONString json = MAPPER.writeValueAsString(stock);// 2.2.存入redisredisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);}}public void saveItem(Item item) {try {String json = MAPPER.writeValueAsString(item);redisTemplate.opsForValue().set("item:id:" + item.getId(), json);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public void deleteItemById(Long id) {redisTemplate.delete("item:id:" + id);}
}

RabbitMQ高级特性

消息可靠性

  • 丢失原因

    • 发送时丢失
      • 生产者发送的消息未送达exchange
      • 消息到达exchange后未到达queue
    • MQ宕机,queue将消息丢失
    • consumer接收到消息后未消费就宕机
  • 解决方案

    • 生产者确认机制
    • mq持久化
    • 消费者确认机制
    • 失败重试机制

生产者消息确认

  • publisher-confirm,发送者确认

    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执

    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因
  • 配置

spring:rabbitmq:# simple:同步等待confirm结果,直到超时# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallbackpublisher-confirm-type: correlated# 开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallbackpublisher-returns: truetemplate:# 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息mandatory: true
  • ReturnCallback
java">@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 设置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {// 投递失败,记录日志log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString());// 如果有业务需要,可以重发消息});}
}
  • ConfirmCallback
java">// 1.消息体
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(result -> {if(result.isAck()){// 3.1.ack,消息成功log.debug("消息发送成功, ID:{}", correlationData.getId());}else{// 3.2.nack,消息失败log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());}},ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.发送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);// 休眠一会儿,等待ack回执
Thread.sleep(2000);

消息持久化

  • 消息持久化机制

    • 交换机持久化
    • 队列持久化
    • 消息持久化
  • 交换机持久化

java">@Bean
public DirectExchange simpleExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new DirectExchange("simple.direct", true, false);
}
  • 队列持久化
java">@Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列,durable就是持久化的return QueueBuilder.durable("simple.queue").build();
}
  • 消息持久化
java">// 准备消息
Message message = MessageBuilder.withBody("hello, spring".getBytes(Standardcharsets.UTF_8))// 默认就是持久化的.setDeliveryMode(MessageDeliveryMode.PERSISTENT)    // durable 持久.build();
// 发送消息
rabbitTemplate.convertAndSend(  "simple.queue", message);

消费者确认

  • SpringAMQP三种确认模式:
    • manual
      • 手动ack,需要在业务代码结束后,调用api发送ack。
    • auto
      • 自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
    • none
      • 关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
spring:rabbitmq:listener:simple:acknowledge-mode: none # 关闭ack none/auto/manual

消费失败重试机制

  • 本地重试
    • 利用Spring的retry机制,在消费者出现异常时利用本地重试
spring:rabbitmq:listener:simple:retry:enabled: true # 开启消费者失败重试initial-interval: 1000 # 初识的失败等待时长为1秒multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts: 3 # 最大重试次数stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
  • 失败策略
    • MessageRecovery接口
      • RejectAndDontRequeueRecoverer
        • 重试耗尽后,直接reject,丢弃消息
        • 默认就是这种方式
      • ImmediateRequeueMessageRecoverer
        • 重试耗尽后,返回nack,消息重新入队
      • RepublishMessageRecoverer
        • 重试耗尽后,将失败消息投递到指定的交换机
java">@Configuration
public class ErrorMessageConfig {@Bean// 交换机public DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Bean// 队列public Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}@Bean// RepublishMessageRecoverer,关联队列和交换机public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");}
}

死信交换机

  • 死信(dead letter)
    • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
    • 消息是一个过期消息,超时无人消费
    • 要投递的队列消息满了,无法投递
  • 死信交换机
    • 配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中
java">// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化.deadLetterExchange("dl.direct") // 指定死信交换机.build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

TTL

  • ttl

    • 队列设置超时时间,配置x-message-ttl属性
  • 死信交换机、死信队列

java">@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dl.ttl.queue", durable = "true"),exchange = @Exchange(name = "dl.ttl.direct"),key = "ttl"
))
public void listenDlQueue(String msg){log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
}
  • 声明一个队列,并且指定TTL
java">@Bean
public Queue ttlQueue(){return QueueBuilder.durable("ttl.queue") // 指定队列名称,并持久化.ttl(10000) // 设置队列的超时时间,10秒.deadLetterExchange("dl.ttl.direct") // 指定死信交换机.build();
}
  • 将ttl与交换机绑定
java">@Bean
public DirectExchange ttlExchange(){return new DirectExchange("ttl.direct");
}
@Bean
public Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
  • 发送消息时,设定TTL
java">@Test
public void testTTLMsg() {// 创建消息Message message = MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)).setExpiration("5000").build();// 消息ID,需要封装到CorrelationData中CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 发送消息rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);log.debug("发送消息成功");
}

延迟队列

  • 插件

    • DelayExchange
  • 流程

    • 接收消息
    • 判断消息是否具备x-delay属性
    • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
    • 返回routing not found结果给消息发送者
    • x-delay时间到期后,重新投递消息到指定队列
  • 声明延迟交换机

java">@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg);
}
java">@Slf4j
@Configuration
public class DelayExchangeConfig {@Beanpublic DirectExchange delayExchange(){return ExchangeBuilder.directExchange("delay.direct") // 指定交换机类型和名称.delayed() // 设置delay的属性为true.durable(true) // 持久化.build();}@Beanpublic Queue delayedQueue(){return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding(){return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}
  • 发送延迟消息
java">// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}
});

惰性队列

  • 消息堆积

    • 当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限
    • 之后发送的消息就会成为死信,可能会被丢弃
  • Lazy Queues

    • 接收到消息后直接存入磁盘而非内存
    • 消费者要消费消息时才会从磁盘中读取并加载到内存
    • 支持数百万条的消息存储
  • 通过命令行将一个运行中的队列修改为惰性队列

rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues  
java">@Bean
public Queue lazyQueue(){return QueueBuilder.durable("lazy.queue").lazy() // 开启Lazy模式.build();
}
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue",durable = "true",arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String msg){log.info("接收到 lazy.queue的消息:{}", msg);
}

集群

  • 普通集群/叫标准集群(classic cluster)
    • 会在集群的各个节点间共享部分数据
      • 包括:交换机、队列元信息
      • 不包含:队列中的消息
    • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
    • 队列所在节点宕机,队列中的消息就会丢失
  • 镜像集群/主从模式
    • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份
    • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
    • 一个队列的主节点可能是另一个队列的镜像节点
    • 所有操作都是主节点完成,然后同步给镜像节点
    • 主宕机后,镜像节点会替代成新的主
  • 仲裁队列
    • 与镜像队列一样,都是主从模式,支持主从数据同步
    • 使用非常简单,没有复杂的配置
    • 主从同步基于Raft协议,强一致

仲裁队列

  • 创建仲裁队列
java">@Bean
public Queue quorumQueue() {return QueueBuilder.durable("quorum.queue") // 持久化.quorum() // 仲裁队列.build();
}
  • 配置
spring:rabbitmq:addresses: ip:port, ip:port, ip:portusername: xxpassword: xxvirtual-host: /

http://www.ppmy.cn/server/15149.html

相关文章

SpringBoot Redis使用篇

引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId…

nacos外接mysql的docker部署方式

文章目录 引言I 安装nacos(m1版本)1.1 镜像启动1.2 查看docker容器日志1.3 开启鉴权II 外接mysql的docker部署方式2.1 复制mysql-schema.sql2.2 导入mysql-schema.sqlIII 配置远程用户3.1 创建数据库远程用户3.2 案例: 创建nacos用户,用于nacos配置3.3 查看远程用户是否有密码…

python爬虫 - 爬取html中的script数据(股票行情信息 - 雪球网 )

文章目录 1. 分析页面内容数据格式2. 使用re.findall方法&#xff0c;爬取股票行情&#xff08;返回信息异常&#xff09;3. 使用re.findall方法&#xff0c;爬取股票行情&#xff08;正常&#xff09;4. 使用re.search 方法&#xff0c;爬取股票行情&#xff08;返回信息异常&…

(C++) 内类生成智能指针shared_from_this介绍

文章目录 &#x1f601;介绍&#x1f914;类外操作&#x1f605;错误操作&#x1f602;正确操作 &#x1f914;类内操作&#x1f62e;std::enable_shared_from_this<>&#x1f62e;奇异递归模板 CRTP&#xff08;Curiously Recurring Template Pattern&#xff09;&#…

考研日常记录(upd 24.4.24)

由于实在太无聊了 &#xff0c; 所以记录以下考研备考日常 &#xff0c; 增加一点成就感 &#xff0c; 获得一点前进动力。 文章目录 2024.4.18 周四课程情况&#xff1a;时间规划&#xff1a; 2024.4.19 周五课程情况&#xff1a;时间规划&#xff1a; 2024.4.20 周六2024.4.2…

mybatis快速入门-注解版

mybatis 使用注解&#xff0c;简化 xml 配置&#xff0c;汲及到动态 sql 或是多表查询&#xff0c;还是使用 xml 映射文件配置编写。(企业工作中&#xff0c;几乎全是 xml 配置&#xff0c;xml 的 sql 使用注解方式少,而类引用注解方式)。 注解 Select()&#xff1a;查询Inse…

尾矿库安全监测:仪器埋设与维护的关键要求

尾矿库作为矿业生产的重要组成部分&#xff0c;其安全运营对于保障人员生命安全和环境保护具有至关重要的意义。为了确保尾矿库的安全运行&#xff0c;及时发现潜在的安全隐患&#xff0c;必须采取有效的安全监测措施。本文将重点探讨尾矿库安全监测仪器的埋设及维护要求。 一、…

Centos7 的 Open Stack T 版搭建流程 --- (二)配置 SQL 数据库

配置 SQL 数据库 文章目录 配置 SQL 数据库&#xff08;1&#xff09;安装 MariaDB、MariaDB服务器 和 Python 2 PyMySQLcontroller &#xff08;2&#xff09;配置数据库文件controller &#xff08;1&#xff09;安装 MariaDB、MariaDB服务器 和 Python 2 PyMySQL controlle…