用最少的代码模拟gRPC四种消息交换模式

news/2024/11/25 21:18:02/

我们知道,建立在HTTP2/3之上的gRPC具有四种基本的通信模式或者消息交换模式(MEP: Message Exchange Pattern),即Unary、Server Stream、Client Stream和Bidirectional Stream。本篇文章通过4个简单的实例演示它们在.NET平台上的实现原理

一、定义ProtoBuf消息

我们选择简单的“Hello World”场景进行演示:客户端请求指定一个或者多个名字,回复以“Hello, {Name}!”。为此我们在一个ASP.NET Core应用中定义了如下两个ProtoBuf消息HelloRequest和HelloReply,生成两个同名的消息类型。

syntax = "proto3";message HelloRequest {string names = 1;
}message HelloReply {string message = 1;
}

二、请求/响应的读写

gRPC框架的核心莫过于在服务端针对请求消息的读取和对响应消息的写入;以及在客户端针对请求消息的写入和对响应消息的读取。这四个核心功能被实现在如下这两个扩展方法中。如下面的代码片段所示,扩展方法WriteMessageAsync将指定的ProtoBuf消息写入PipeWriter对象中。为了确保消息能够被准确的读取,我们利用前置的四个字节存储了消息的字节数。

public static class ReadWriteExtensions
{public static ValueTask<FlushResult> WriteMessageAsync(this PipeWriter writer, IMessage message){var length = message.CalculateSize();var span = writer.GetSpan(4+length);BitConverter.GetBytes(length).CopyTo(span);message.WriteTo(span.Slice(4, length));writer.Advance(4 + length);return writer.FlushAsync();}public static async Task ReadAndProcessAsync<TMessage>(this PipeReader reader, MessageParser<TMessage> parser, Func<TMessage, Task> handler) where TMessage:IMessage<TMessage>{while(true){var result = await reader.ReadAsync();var buffer = result.Buffer;while (TryReadMessage(ref buffer, out var message)){await handler(message!);}reader.AdvanceTo(buffer.Start, buffer.End);if(result.IsCompleted){break;}}bool TryReadMessage(ref ReadOnlySequence<byte> buffer, out TMessage? message){if(buffer.Length < 4){message = default;return false;}Span<byte> lengthBytes = stackalloc byte[4];buffer.Slice(0,4).CopyTo(lengthBytes);var length = BinaryPrimitives.ReadInt32LittleEndian(lengthBytes);if (buffer.Length < length + 4){message = default;return false;}message = parser.ParseFrom(buffer.Slice(4, length));buffer = buffer.Slice(length + 4);return true;}}
}

ReadAndProcessAsync扩展方法从指定的PipeReader对象中读取指定类型的ProtoBuf消息,并利用指定处理器(一个Func<TMessage, Task>委托)对它进行处理。由于写入时指定了消息的字节数,所以我们可以将承载消息的字节“精准地”读出来,并利用指定的MessageParser<TMessage>对其进行序列化。

三、Unary

我们知道正常的gRPC开发需要将包含一个或者多个操作的服务定义在ProtoBuf文件中,并利用它生成一个基类,我们通过继承这个基类并重写操作对应方法。对于ASP.NET Core gRPC来说,服务操作对应的方法最终会转换成对应的终结点并以路由的形式进行注册。这个过程其实并不复杂,但不是本篇文章关注的终结点。本文会直接注册四个对应的路由终结点来演示四个基本的消息交换模式。

Unary调用最为简单,就是简单的Request/Reply模式。在如下的代码中,我们注册了一个针对请求路径“/unary”的路由,对应的处理方法为如下所示的HandleUnaryCallAsync。该方法直接调用上面定义的ReadAndProcessAsync扩展方法将请求消息(HelloRequest)从请求的BodyReader中读取出来,并生成一个对应的HelloReply消息予以应答。后者利用上面的WriteMessageAsync扩展方法写入响应的BodyWriter。

using GrpcService;
using System.IO.Pipelines;
using System.Net;var app = WebApplication.Create();
app.MapPost("/unary", HandleUnaryCallAsync);
await app.StartAsync();await UnaryCallAsync();static async Task HandleUnaryCallAsync(HttpContext httpContext)
{var reader = httpContext.Request.BodyReader;var write = httpContext.Response.BodyWriter;await reader.ReadAndProcessAsync(HelloRequest.Parser, async hello =>{var reply = new HelloReply { Message = $"Hello, {hello.Names}!" };await write.WriteMessageAsync(reply);});
}static async Task UnaryCallAsync()
{using (var httpClient = new HttpClient()){var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/unary"){Version = HttpVersion.Version20,VersionPolicy = HttpVersionPolicy.RequestVersionExact,Content = new MessageContent(new HelloRequest { Names = "foobar" })};var reply = await httpClient.SendAsync(request);await PipeReader.Create(await reply.Content.ReadAsStreamAsync()).ReadAndProcessAsync(HelloReply.Parser, reply =>{Console.WriteLine(reply.Message);return Task.CompletedTask;});}
}

UnaryCallAsync模拟了客户端针对Unary服务操作的调用,具体的调用由我们熟悉的HttpClient对象完成。如代码片段所示,我们针对路由地址创建了一个HttpRequestMessage对象,并对其HTTP版本进行了设置(2.0),代表请求主体内容的HttpContent是一个MessageContent对象,具体的定义如下。MessageContent将代表ProtoBuf消息的IMessage对象作为主体内容,在重写的SerializeToStreamAsync,我们调用上面定义的WriteMessageAsync扩展方法将指定的IMessage对象写入输出流中。

public class MessageContent : HttpContent
{private readonly IMessage _message;public MessageContent(IMessage message) => _message = message;protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context)=>await PipeWriter.Create(stream).WriteMessageAsync(_message);protected override bool TryComputeLength(out long length){length = -1;return false;}
}

创建的HttpRequestMessage对象利用HttpClient发送出去后,我们得到对应的HttpResponseMessage对象,并调用ReadAndProcessAsync扩展方法将主体内容读取出来并反序列化成HelloReply对象,其承载的问候消息将以如下的形式输出到控制台上。

四、Server Stream

Server Stream这种消息交换模式意味着服务端可以将内容以流的形式响应给客户端。作为模拟,客户端会携带一个名字列表(“foo,bar,baz,qux”),服务端以流的形式针对每个名字回复一个问候消息,具体的实现体现在针对请求路径“/serverstream”的路由处理方法HandleServerStreamCallAsync上。和上面一样,HandleServerStreamCallAsync方法利用我们定义的ReadAndProcessAsync方法读取作为请求的HelloRequest对象,并针对其携带的每一个名气生成一个HelloReply对象,后者最终通过我们定义的WriteMessageAsync方法予以响应。为了体验“流”的效果,我们添加了1秒的时间间隔。

using GrpcService;
using System.IO.Pipelines;
using System.Net;var app = WebApplication.Create();
app.MapPost("/unary", HandleUnaryCallAsync);
app.MapPost("/serverstream", HandleServerStreamCallAsync);
await app.StartAsync();await ServerStreamCallAsync();static async Task HandleServerStreamCallAsync(HttpContext httpContext)
{var reader = httpContext.Request.BodyReader;var write = httpContext.Response.BodyWriter;await reader.ReadAndProcessAsync(HelloRequest.Parser, async hello =>{var names = hello.Names.Split(',');foreach (var name in names){var reply = new HelloReply { Message = $"Hello, {name}!" };await write.WriteMessageAsync(reply);await Task.Delay(1000);}});
}static async Task ServerStreamCallAsync()
{using (var httpClient = new HttpClient()){var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/serverstream"){Version = HttpVersion.Version20,VersionPolicy = HttpVersionPolicy.RequestVersionExact,Content = new MessageContent(new HelloRequest { Names = "foo,bar,baz,qux" })};var reply = await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);await PipeReader.Create(await reply.Content.ReadAsStreamAsync()).ReadAndProcessAsync(HelloReply.Parser, reply =>{Console.WriteLine($"[{DateTimeOffset.Now}]{reply.Message}");return Task.CompletedTask;});}
}

模拟客户端调用的ServerStreamCallAsync方法在生成一个携带多个名字的HttpRequestMessage对象,并利用HttpClient将其发送出去。由于服务端是以流的形式对请求进行响应的,所以我们在调用SendAsync方法是将HttpCompletionOption.ResponseHeadersRead枚举作为第二个参数,这样我们才能在收到响应头部之后得到代表响应消息的HttpResponseMessage对象。这样的响应将会携带4个问候消息,我们同样利用ReadAndProcessAsync方法将读取并以如下的形式输出到控制台上。

五、Client Stream

Client Stream与Server Stream正好相反,客户端会以流的形式将请求内容提交给服务端进行处理。由于我们以HttpClient来模拟客户端,所以我们只能从HttpRequestMessage上作文章。具体来说,我们需要自定义一个HttpContent类型,让它以“客户端流”的形式相对方发送内容。这个自定义的HttpContent就是如下这个ClientStreamContent<TMessage>类型。如代码片段所示,ClientStreamContent<TMessage>是对一个ClientStreamWriter<TMessage>对象的封装,客户端程序利用后者以流的形式向服务端输出TMessage对象承载的内容。对于ClientStreamWriter<TMessage>方法来说,作为输出流的Stream对象是在ClientStreamContent<TMessage>重写的SerializeToStreamAsync方法中指定的。WriteAsync方法利用我们定义的WriteMessageAsync扩展方法实现了针对ProtoBuf消息的输出。客户端通过调用Complete方法决定客户端流是否终结,ClientStreamContent<TMessage>重写的SerializeToStreamAsync通过WaitAsync进行等待。

public class ClientStreamContent<TMessage> : HttpContent where TMessage:IMessage<TMessage>
{private readonly ClientStreamWriter<TMessage> _writer;public ClientStreamContent(ClientStreamWriter<TMessage> writer)=> _writer = writer;protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context) => _writer.SetOutputStream(stream).WaitAsync();protected override bool TryComputeLength(out long length) => (length = -1) != -1;
}public class ClientStreamWriter<TMessage> where TMessage: IMessage<TMessage>
{private readonly TaskCompletionSource<Stream> _streamSetSource = new();private readonly TaskCompletionSource _streamEndSuource = new();public ClientStreamWriter<TMessage> SetOutputStream(Stream outputStream){_streamSetSource.SetResult(outputStream);return this;}public async Task WriteAsync(TMessage message){var stream = await _streamSetSource.Task;await PipeWriter.Create(stream).WriteMessageAsync(message);}public void Complete()=> _streamEndSuource.SetResult();public Task WaitAsync() => _streamEndSuource.Task;
}

针对Client Stream的模拟体现在针对路径“/clientstream”的路由处理方法HandleClientStreamCallAsync。这个方法没有什么特别之处,它进行时调用ReadAndProcessAsync方法将HelloRequest消息读取出来,并将生成的问候语直接输出到本地(服务端)控制台上而已。

using GrpcService;
using System.IO.Pipelines;
using System.Net;var app = WebApplication.Create();
app.MapPost("/unary", HandleUnaryCallAsync);
app.MapPost("/serverstream", HandleServerStreamCallAsync);
app.MapPost("/clientstream", HandleClientStreamCallAsync);
await app.StartAsync();await ClientStreamCallAsync();static async Task HandleClientStreamCallAsync(HttpContext httpContext)
{var reader = httpContext.Request.BodyReader;var write = httpContext.Response.BodyWriter;await reader.ReadAndProcessAsync(HelloRequest.Parser, async hello =>{var names = hello.Names.Split(',');foreach (var name in names){Console.WriteLine($"[{DateTimeOffset.Now}]Hello, {name}!");        }});
}static async Task ClientStreamCallAsync()
{using (var httpClient = new HttpClient()){var writer = new ClientStreamWriter<HelloRequest>();var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/clientstream"){Version = HttpVersion.Version20,VersionPolicy = HttpVersionPolicy.RequestVersionExact,Content = new ClientStreamContent<HelloRequest>(writer)};_ =  httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);foreach (var name in new string[] {"foo","bar","baz","qux" }){await writer.WriteAsync(new HelloRequest { Names = name});await Task.Delay(1000);}writer.Complete();}
}

在用于模拟Client Stream调用的ClientStreamCallAsync方法中,我们首先创建了一个ClientStreamWriter<HelloRequest>对象,并利用它创建了对应的ClientStreamContent<HelloRequest>对象,后者将作为HttpRequestMessage消息的主体内容。在调用HttpClient的SendAsync方法后,我们并没有作任何等待(否则程序将卡在这里),而是利用ClientStreamWriter<HelloRequest>对象以流的形式发送了四个请求。服务端在接收到每个请求后,会将对应的问候语以如下的形式输出到控制台上。

六、Bidirectional Stream

Bidirectional Stream将连接作为真正的“双工通道”。这次我们不再注册额外的路由,而是直接利用前面模拟Unary的路由终结点来演示双向通信。在如下所示的客户端模拟方法BidirectionalStreamCallAsync中,我们采用上面的方式以流的形式发送了4个HelloRequest。

using GrpcService;
using System.IO.Pipelines;
using System.Net;var app = WebApplication.Create();
app.MapPost("/unary", HandleUnaryCallAsync);
app.MapPost("/serverstream", HandleServerStreamCallAsync);
app.MapPost("/clientstream", HandleClientStreamCallAsync);
await app.StartAsync();await BidirectionalStreamCallAsync();static async Task BidirectionalStreamCallAsync()
{using (var httpClient = new HttpClient()){var writer = new ClientStreamWriter<HelloRequest>();var request = new HttpRequestMessage(HttpMethod.Post, "http://localhost:5000/unary"){Version = HttpVersion.Version20,VersionPolicy = HttpVersionPolicy.RequestVersionExact,Content = new ClientStreamContent<HelloRequest>(writer)};var task = httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead);_ = Task.Run(async () =>{var response = await task;await PipeReader.Create(await response.Content.ReadAsStreamAsync()).ReadAndProcessAsync(HelloReply.Parser, reply =>{Console.WriteLine($"[{DateTimeOffset.Now}]{reply.Message}");return Task.CompletedTask;});});foreach (var name in new string[] { "foo", "bar", "baz", "qux" }){await writer.WriteAsync(new HelloRequest { Names = name });await Task.Delay(1000);}writer.Complete();}
}

于此同时,我们在得到表示响应消息的HttpResponseMessage后,调用ReadAndProcessAsync方法将作为响应的问候语以如下的方式输出到控制台上。

 


http://www.ppmy.cn/news/101660.html

相关文章

Linux使用PowerShell模块管理MsSql-Server

1.安装PowserShell 更新包列表 sudo apt-get update 安装依赖: sudo apt-get install -y wget apt-transport-https software-properties-common 下载 key: wget -q "https://packages.microsoft.com/config/ubuntu/$(lsb_release -rs)/packages-microsoft-prod.deb&…

MySQL — 视图、存储过程、触发器

文章目录 视图/存储过程/存储函数/触发器一、视图1.1 语法1.1.1 创建视图1.1.2 查询1.1.3 修改1.1.4 删除1.1.5 对数据的操作 1.2 检查选项1.2.1 cascaded1.2.2 local 1.3 视图的更新1.4 视图的作用1.5 案例1.5.1 案例11.5.2 案例2 二、存储过程2.1 介绍2.2 基本语法2.3 变量2.…

项目复盘四步:怎么做才有成效?这些关键点不可忽略

在项目管理中&#xff0c;及时复盘是非常重要的&#xff0c;因为只有通过反思和分析&#xff0c;才能找到差距存在的原因。 复盘分析的第一步是回顾目标 因为目标是工作开展的关键。在执行项目的过程中&#xff0c;要始终朝着所设定的目标去努力实现。计划和现实会存在偏差&…

Linux-0.11 入口函数main.c详解

Linux-0.11 入口函数main.c详解 模块简介 main.c大部分代码主要是对内核进行初始化&#xff0c;而main.c开始&#xff0c;就都是c语言编写的内核了。 函数详解 time_init static void time_init(void)该函数读取CMOS时钟信息作为系统的开机时间。 struct tm time;do {time…

选择交换机主要看哪些参数指标

交换机有几个性能指标您一定要知道哦&#xff0c;和海翎光电的小编一起温故而知新。 网络构成方式&#xff1a;接入层交换机、汇聚层交换机、核心层交换机 OST模型&#xff1a;第二层交换机、第三层交换机、第四层交换机……第七层交换机 交换机的可管理性&#xff1a;可管理…

15-Vue技术栈之创建Vue3.0工程

目录 1.使用 vue-cli 创建2.使用 vite 创建 1.使用 vue-cli 创建 官方文档&#xff1a;https://cli.vuejs.org/zh/guide/creating-a-project.html#vue-create ## 查看vue/cli版本&#xff0c;确保vue/cli版本在4.5.0以上 vue --version ## 安装或者升级你的vue/cli npm insta…

最常见JS加密保护代码的方法

当谈到JavaScript&#xff08;简称JS&#xff09;代码的保护时&#xff0c;加密是一种常见的策略。加密可以帮助保护你的JS代码&#xff0c;防止未经授权的访问、修改和复制。在本文中&#xff0c;我将向你介绍一些常用的js加密保护方法&#xff0c;并提供一些通俗易懂的示例代…

vue实现验证码登陆

我们在使用 vue进行前端开发时&#xff0c;都需要登录验证&#xff0c;而在登录的过程中&#xff0c;用户需要输入自己的用户名和密码&#xff0c;如果是输错的话还需要进行再次输入。这样不仅容易造成用户密码泄露&#xff0c;还会影响用户体验。因此在我们的系统中都会存在验…