RPC 集群,gRPC 广播和组播

news/2025/1/31 6:59:40/

一、集群抽象:cluster

它是指我们在调用远程的时候,尝试解决:

1、failover:即引入重试功能,但是重试的时候会换一个新节点

2、failfast: 立刻失败,不需要重试

3、广播:将请求发送到所有的节点上

4、组播:组播和分组功能不太一样,组播是指请求发送到一组节点上,而不是只发送到一个单一节点上

注意:failover

二、 gRPC 的 Interceptor 分成好几种:

UnaryClientInterceptor: 用于拦截 gRPC unary 请求

StreamClientIntercepror 用于拦截 gRPC 的 stream 请求。

三、gRPC 广播:注册中心获取所有节点

思路:

利用拦截器捕获调用

利用注册中心来获取所有的服务实例

在拦截器内遍历所有的服务端实例

四、gRPC 限流:

利用服务端拦截器调用,进行限流逻辑

五、gRPC 广播实现

gRPC 广播利用客户端拦截器实现,步骤也非常简单,以下几步:

1、利用注册中心获取所有节点

2、利用filter 过滤节点

3、grpc.Dial 循环调用节点,发起tcp 请求

注意:reflect.TypeOf(reply).Elem() 以及 reflect.New(typ).Interface() 生成一个新的Reply 避免覆盖, filter 是nil 就是广播,非nil 就是组播

对节点进行过滤

type MyIntercaptor struct {register registry.Registermethod   stringfilter   Filter
}
​
type SetOptions func(optins *MyIntercaptor)
​
func NewMyInterceptor(register registry.Register, method string, options ...SetOptions) *MyIntercaptor {t := &MyIntercaptor{register: register,method:   method,filter: func(g1 string, ctx context.Context) bool {return true},}for _, opt := range options {opt(t)}return t
}
​
func WithMyInterSetFilter(filter Filter) SetOptions {return func(option *MyIntercaptor) {option.filter = filter}
}
​
func (m *MyIntercaptor) Intercaptor() grpc.UnaryClientInterceptor {return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {list, er := m.register.ListServices(ctx, m.method)if er != nil {return er}// 是否是广播ok, resp := IsBroadCast(ctx)defer func() {close(resp)}()if !ok {return invoker(ctx, method, req, reply, cc, opts...)}var err errgroup.Groupfor _, li := range list {if !m.filter(li.Group, ctx) {continue}if li.Addr == "" {continue}typ := reflect.TypeOf(reply).Elem()addr := li.Addr// 并发调用err.Go(func() error {dial, er := grpc.Dial(addr, grpc.WithInsecure())if er != nil {resp <- Resp{Err: er,}return nil}rep := reflect.New(typ).Interface()// 发送方法请求er = invoker(ctx, method, req, rep, dial, opts...)resp <- Resp{Err:   er,Reply: rep,}return nil})
​}return err.Wait()}
}
​
type Filter func(g1 string, ctx context.Context) bool
​
func NewFilter() Filter {return func(g1 string, ctx context.Context) bool {group, ok := ctx.Value("group").(string)return ok && group == g1}
}
​
type broadcastKey struct {
}
​
func UseBroadcastKey(ctx context.Context) (context.Context, chan Resp) {ch := make(chan Resp)return context.WithValue(ctx, broadcastKey{}, ch), ch
}
​
func IsBroadCast(ctx context.Context) (bool, chan Resp) {resp, ok := ctx.Value(broadcastKey{}).(chan Resp)return ok, resp
}
​
type Resp struct {Err   errorReply any
}


http://www.ppmy.cn/news/1260649.html

相关文章

网络安全威胁——DDoS攻击

DDoS攻击 1. 定义2. DDoS攻击类型2.1 网络层攻击2.2 传输层攻击2.3 应用层攻击 3.DDoS攻击态势特点 1. 定义 分布式拒绝服务&#xff08;DDoS&#xff09;攻击是一种常见的网络攻击形式。攻击者利用恶意程序对一个或多个目标发起攻击&#xff0c;企图通过大规模互联网流量耗尽…

算法——滑动窗口

滑动窗口大致分为两类&#xff1a;一类是窗口长度固定的&#xff0c;即left和right可以一起移动&#xff1b;另一种是窗口的长度变化&#xff08;例如前五道题&#xff09;&#xff0c;即right疯狂移动&#xff0c;left没怎么动&#xff0c;这类题需要观察单调性(即指针)等各方…

Jtti:降低香港服务器被攻击的几种策略方法

作为承载跨境电商业务的主要载体之一&#xff0c;香港服务器也承受着越来越大的安全压力。除了常规的DDOS攻击造成的服务器瘫痪之外&#xff0c;非法入侵和数据窃取也成为了任何企业都必须要面对的日常压力。服务器的防护措施千千万&#xff0c;总会有遗漏的地方&#xff0c;还…

如何在计算机格式化后恢复照片

您想了解如何从已格式化的笔记本电脑或台式机中恢复已删除的照片吗&#xff1f;这篇文章解释了如何使用最佳格式的照片恢复软件来做到这一点。您可以通过简单的步骤在格式化计算机后恢复已删除的图像。 将照片保存在笔记本电脑或 PC 硬盘上很常见。与相机存储卡和 USB 闪存驱动…

为什么对中小企业来说,数字化转型很难?

引言 数字化转型对中小企业至关重要&#xff0c;然而&#xff0c;实施这一转型却充满挑战。中小企业面临着资源、技术、文化和安全方面的种种难题&#xff0c;这些困难限制了它们在数字化领域的发展和竞争力。这种挑战不仅影响企业内部运营&#xff0c;还直接影响其与客户和市…

总结|哪些平台有大模型知识库的Web API服务

截止2023/12/6 笔者个人的调研&#xff0c;有三家有大模型知识库的web api服务&#xff1a; 平台类型文档数量文档上传并解析的结构api情况返回页码文心一言插件版多文档有问答api&#xff0c;文档上传是通过网页进行上传有&#xff0c;而且是具体的chunk id&#xff0c;需要设…

MybatisPlus批量插入(伪批量),增强为真实批量插入

项目基于优秀开源项目&#xff1a;若依 项目背景&#xff1a;项目中牵扯到数据批量导入&#xff0c;为提高性能&#xff0c;先考虑将MybatisPlus伪批量插入增强为真实批量插入 MybatisPlus源码&#xff1a; MybatisPlus支持批量插入&#xff0c;但是跟踪源码发现底层是将批量…

简单使用selenium抓取微博热搜话题存储进Excel表格中

#test.pyimport requests from selenium import webdriver import time from write import write#首先打开浏览器 drive webdriver.Chrome()#设置隐式等待&#xff1a;等待元素找到&#xff0c;如果找到元素则马上继续执行语句&#xff0c;如果找不到元素&#xff0c;会在设定…