记录一次gRpc流式操作

server/2024/10/19 6:24:39/

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)

gRpc协议类定义

message AdMsgProto{
optional string msg=1;
optional string tag=2;
optional string topic=3;
}
2. service方法定义
service MQDataService{
rpc sendRedissonMsg(AdMsgProto)returns (Code);
rpc receiveRedissonMsg(String)returns (stream AdMsgProto);
}

服务端写法

java">package com.mykkhw.mykkhw_data_mq.service.grpc;import com.mykkhw.mykkhw_data_protocols.Base.ReqDataProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultType;
import com.mykkhw.mykkhw_data_protocols.MQ.TiktokMsgProto;
import com.mykkhw.mykkhw_data_protocols.MQService.MQDataServiceGrpc;
import io.grpc.stub.StreamObserver;
import org.redisson.api.RBlockingQueue;
import org.springframework.util.StringUtils;public class MqRpcService extends MQDataServiceGrpc.MQDataServiceImplBase {@Overridepublic void sendRedissonMsg(AdMsgProto request, StreamObserver<ResultProto> responseObserver) {RpcServicePools.mqProducer.sendMsg(request.getMsg(), request.getTag(), request.getTopic());ResultProto.Builder builder = ResultProto.newBuilder();builder.setCode(ResultType.SUCCESS);responseObserver.onNext(builder.build());responseObserver.onCompleted();}@Overridepublic void receiveRedissonMsg(ReqDataProto request, StreamObserver<AdMsgProto> responseObserver) {try {RBlockingQueue<String> queue = RpcServicePools.redisson.getBlockingQueue(request.getName());// 循环处理消息while (!Thread.currentThread().isInterrupted()) {// 阻塞式获取消息,没有消息时线程会等待String message = queue.take();if(StringUtils.hasText(message)){AdMsgProto.Builder builder = AdMsgProto.newBuilder();builder.setMsg(message);...responseObserver.onNext(builder.build());}}} catch (Exception e) {Thread.currentThread().interrupt();  // 重新设置线程的中断标志}responseObserver.onCompleted();}
}//mq依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.15.0</version><scope>compile</scope>
</dependency>

客户端写法

java"> // 消息生产者private void buildAppMSg(...param) {...// 发送mq消息ClientManager.getMqDataServiceFutureStub().sendRedissonMsg(adMsg.build());}//消费者流式接收public static void receiveFacebookMsg() {try {log.info("facebook msg");// 处理服务器流式响应StreamObserver<AdMsgProto> responseObserver = new StreamObserver<AdMsgProto>() {@Overridepublic void onNext(AdMsgProto msgProto) {log.info("facebook 接收到消息: {}", msgProto.getMsg());...}@Overridepublic void onError(Throwable throwable) {log.info("Error occurred: {}", throwable.getMessage());...}@Overridepublic void onCompleted() {log.info("Stream completed.");...}};log.info("接收msg 开始");ClientManager.getMqDataServiceStub().receiveRedissonMsg(build, responseObserver);log.info("接收msg 成功");}catch (Exception e){log.info("出错了");}}~~~jedis和消息优化版:

http://www.ppmy.cn/server/128408.html

相关文章

Windows11系统下Docker环境搭建教程

目录 前言Docker简介安装docker总结 前言 本文为博主在项目环境搭建时记录的Docker安装流程&#xff0c;希望对大家能够有所帮助&#xff0c;不足之处欢迎批评指正&#x1f91d;&#x1f91d;&#x1f91d; Docker简介 Docker 就像一个“容器”平台&#xff0c;可以帮你把应用…

Stable Diffusion绘画 | 来训练属于自己的模型:炼丹参数调整--步数设置与计算

要想训练一个优质的模型&#xff0c;一定要认识和了解模型训练中&#xff0c;参数的作用和意义。 整个模型训练的过程&#xff0c;参数并不是一成不变的&#xff0c;也没有固定的模板&#xff0c; 当我们修改了模型训练里面的某个参数&#xff0c;很可能就需要连带其他一系列…

Linux云计算 |【第四阶段】RDBMS1-DAY3

主要内容&#xff1a; 子查询&#xff08;单行单列、多行单列、单行多列、多行多列&#xff09;、分页查询limit、联合查询union、插入语句、修改语句、删除语句 一、子查询 子查询就是指的在一个完整的查询语句之中&#xff0c;嵌套若干个不同功能的小查询&#xff0c;从而一…

ubuntu图形界面右上角网络图标找回解决办法

问题现象&#xff1a; ubuntu图形界面右上角网络图标消失了&#xff0c;不方便联网&#xff1a; 正常应该是下图&#xff1a; 网络寻找解决方案&#xff0c;问题未解决&#xff0c;对于某些场景可能有用&#xff0c;引用过来&#xff1a; 参考方案 方法一 修改虚拟机的网络管…

SQL 语法学习指南

目录 前言1. SQL 的基本概念1.1 SQL 的作用1.2 SQL 的特点 2. SQL 的基础语法2.1 数据查询 - SELECT 语句2.2 数据插入 - INSERT 语句2.3 数据更新 - UPDATE 语句2.4 数据删除 - DELETE 语句 3. SQL 的进阶语法3.1 聚合函数3.2 表连接 - JOIN3.3 子查询 4. SQL 学习建议4.1 多实…

华为平板与非华为电脑多屏协同及Bug处理

本文参考B站空降猫咪、鱼翅2002、知乎奔跑的小牛、夏风微微等博主。 电脑版本型号&#xff1a;拯救者Y7000&#xff0c;核显3050Ti,无集成显卡 平板版本型号&#xff1a;华为matepad pro解决办法&#xff1a; 下载空降猫咪的安装器PCManagerInstaller_20230801 CSDN下载链接 …

HTML 与 ES6 是前端开发的两大核心技术

1. HTML 核心技术 HTML&#xff08;Hypertext Markup Language&#xff09;作为前端开发的基础&#xff0c;负责页面内容和结构的定义。以下是 HTML 的一些核心特性和技术点&#xff1a; 1.1 HTML5 新特性 HTML5 带来了许多新的元素和功能&#xff0c;提高了开发效率和用户体…

CSP-J/S 复赛算法 并查集-Hash表

文章目录 前言并查集并查集是什么&#xff1f;并查集的应用举几个并查集的例子更加详细的介绍合并两个集合判断元素的关系 并查集在树中的表示方法并查集在树中的表示概念 字符串图示例初始状态合并操作示例最终结构 查找操作和路径压缩示例 并查集的工作原理判断元素是否在同一…