记录一次gRpc流式操作

news/2024/10/4 13:09:13/

使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)

gRpc协议类定义

message AdMsgProto{
optional string msg=1;
optional string tag=2;
optional string topic=3;
}
2. service方法定义
service MQDataService{
rpc sendRedissonMsg(AdMsgProto)returns (Code);
rpc receiveRedissonMsg(String)returns (stream AdMsgProto);
}

服务端写法

java">package com.mykkhw.mykkhw_data_mq.service.grpc;import com.mykkhw.mykkhw_data_protocols.Base.ReqDataProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultProto;
import com.mykkhw.mykkhw_data_protocols.Base.ResultType;
import com.mykkhw.mykkhw_data_protocols.MQ.TiktokMsgProto;
import com.mykkhw.mykkhw_data_protocols.MQService.MQDataServiceGrpc;
import io.grpc.stub.StreamObserver;
import org.redisson.api.RBlockingQueue;
import org.springframework.util.StringUtils;public class MqRpcService extends MQDataServiceGrpc.MQDataServiceImplBase {@Overridepublic void sendRedissonMsg(AdMsgProto request, StreamObserver<ResultProto> responseObserver) {RpcServicePools.mqProducer.sendMsg(request.getMsg(), request.getTag(), request.getTopic());ResultProto.Builder builder = ResultProto.newBuilder();builder.setCode(ResultType.SUCCESS);responseObserver.onNext(builder.build());responseObserver.onCompleted();}@Overridepublic void receiveRedissonMsg(ReqDataProto request, StreamObserver<AdMsgProto> responseObserver) {try {RBlockingQueue<String> queue = RpcServicePools.redisson.getBlockingQueue(request.getName());// 循环处理消息while (!Thread.currentThread().isInterrupted()) {// 阻塞式获取消息,没有消息时线程会等待String message = queue.take();if(StringUtils.hasText(message)){AdMsgProto.Builder builder = AdMsgProto.newBuilder();builder.setMsg(message);...responseObserver.onNext(builder.build());}}} catch (Exception e) {Thread.currentThread().interrupt();  // 重新设置线程的中断标志}responseObserver.onCompleted();}
}//mq依赖
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.15.0</version><scope>compile</scope>
</dependency>

客户端写法

java"> // 消息生产者private void buildAppMSg(...param) {...// 发送mq消息ClientManager.getMqDataServiceFutureStub().sendRedissonMsg(adMsg.build());}//消费者流式接收public static void receiveFacebookMsg() {try {log.info("facebook msg");// 处理服务器流式响应StreamObserver<AdMsgProto> responseObserver = new StreamObserver<AdMsgProto>() {@Overridepublic void onNext(AdMsgProto msgProto) {log.info("facebook 接收到消息: {}", msgProto.getMsg());...}@Overridepublic void onError(Throwable throwable) {log.info("Error occurred: {}", throwable.getMessage());...}@Overridepublic void onCompleted() {log.info("Stream completed.");...}};log.info("接收msg 开始");ClientManager.getMqDataServiceStub().receiveRedissonMsg(build, responseObserver);log.info("接收msg 成功");}catch (Exception e){log.info("出错了");}}~~~jedis和消息优化版:

http://www.ppmy.cn/news/1534424.html

相关文章

【AI知识点】词频-逆文档频率(TF-IDF)

词频-逆文档频率&#xff08;TF-IDF&#xff0c;Term Frequency-Inverse Document Frequency&#xff09;是一种用来衡量一个词在某个文档中的重要性&#xff0c;同时结合该词在整个文档集中的出现频率。它的核心思想是&#xff1a;在特定文档中出现频率高且在其他文档中较少出…

深入探讨分布式数据库:技术架构、应用案例与性能优化

1. 引言 在大数据时代&#xff0c;数据量呈指数级增长&#xff0c;传统的单机数据库面临诸多挑战&#xff0c;如性能瓶颈、可扩展性不足和单点故障等问题。分布式数据库应运而生&#xff0c;通过将数据分散存储在多个节点上&#xff0c;提供高可用性和可扩展性&#xff0c;成为…

电商技术选型:Spring Boot在线商城系统

4 系统设计 ONLY在线商城系统的设计方案比如功能框架的设计&#xff0c;比如数据库的设计的好坏也就决定了该系统在开发层面是否高效&#xff0c;以及在系统维护层面是否容易维护和升级&#xff0c;因为在系统实现阶段是需要考虑用户的所有需求&#xff0c;要是在设计阶段没有经…

论文推荐 |【Agent】自动化Agent设计系统

论文标题&#xff1a; Automated Design of Agentic Systems 论文地址&#xff1a; https://arxiv.org/abs/2408.08435 GitHub地址&#xff1a; https://github.com/ShengranHu/ADAS 自动化代理设计在性能和通用性方面显著超越了手动方法。 • 引入了自动化代理系统设计&am…

Redis: Sorted Set 底层算法的简单分析

概述 我们先看下 Shorted Set 有序集合的内部数据结构所谓有序集合&#xff0c;比如有个容器&#xff0c;容器里边都已经排好序了&#xff0c;那无非就是快速的查找和插入不管你是查找还是插入&#xff0c;肯定要确定那个位置最简单的办法就是从最开头开始&#xff0c;挨个比较…

QCamera6.7笔记

1.QCamera​ .h文件 #include <QtWidgets/QMainWindow> #include "ui_QCamera_test1.h" #include <QCamera> #include <QtMultimedia> #include <QtMultimediaWidgets> #include<QMediaCaptureSession> #include <QMediaDevices&…

Linux下驱动开发实例

驱动开发 驱动与硬件的分离 在传统的嵌入式系统开发中&#xff0c;硬件信息往往是直接硬编码在驱动代码中的。这样做的问题是&#xff0c;当硬件发生变化时&#xff0c;比如增加或更换设备&#xff0c;就需要修改驱动程序的代码&#xff0c;这会导致维护成本非常高。因此&…

Flume面试整理-Flume是什么?

Apache Flume 是一个分布式的、可靠的、高可用的数据收集和传输系统,专为从各种数据源(如日志文件、网络流)收集、聚合和传输大量数据而设计。它主要用于在大数据生态系统中,特别是Hadoop环境中,将数据从多个分散的来源实时地传输到一个集中的存储系统(如HDFS、HBase等)…