SpringBoot使用SSE进行实时通知前端
- 说明
- maven依赖
- SSE工具类代码
- Controller测试代码
- 测试结果如下:
- 注意
- 将超时时间由原来的0改为默认的30秒,会报错。
- 将springboot降为低版本如1.4.2.RELEASE。
说明
项目有个需求是要实时通知前端,告诉前端这个任务加载好了。然后想了2个方案,一种是用websocket进行长连接,一种是使用SSE(Sever Send Event),是HTTP协议中的一种,Content-Type为text/event-stream,能够保持长连接。
websocket是前端既能向后端发送消息,后端也能向前端发送消息。
SSE是只能后端向前端发送消息。
因为只需要后端通知,所以我这里选择了使用SSE实现。
这里先做个笔记,怕以后忘记怎么使用。
maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.project</groupId><artifactId>test</artifactId><version>0.0.1-SNAPSHOT</version><name>test</name><description>test</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--web依赖,内嵌入tomcat,SSE依赖于该jar包,只要有该依赖就能使用SSE--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--lombok依赖,用来对象省略写set、get方法--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.22</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
SSE工具类代码
package com.etone.project.utils;import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;@Slf4j
public class SseEmitterServer {/*** 当前连接数*/private static AtomicInteger count = new AtomicInteger(0);private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();public static SseEmitter connect(String userId){//设置超时时间,0表示不过期,默认是30秒,超过时间未完成会抛出异常SseEmitter sseemitter = new SseEmitter(0L);//注册回调sseemitter.onCompletion(completionCallBack(userId));//这个onError在springbooot低版本没有这个方法,公司springboot1.4.2版本,没有这个方法,可以进行注释。sseemitter.onError(errorCallBack(userId));sseemitter.onTimeout(timeoutCallBack(userId));sseEmitterMap.put(userId,sseemitter);//数量+1count.getAndIncrement();log.info("create new sse connect ,current user:{}",userId);return sseemitter;}/*** 给指定用户发消息*/public static void sendMessage(String userId, String message){if(sseEmitterMap.containsKey(userId)){try{sseEmitterMap.get(userId).send(message);}catch (IOException e){log.error("user id:{}, send message error:{}",userId,e.getMessage());e.printStackTrace();}}}/*** 想多人发送消息,组播*/public static void groupSendMessage(String groupId, String message){if(sseEmitterMap!=null&&!sseEmitterMap.isEmpty()){sseEmitterMap.forEach((k,v) -> {try{if(k.startsWith(groupId)){v.send(message, MediaType.APPLICATION_JSON);}}catch (IOException e){log.error("user id:{}, send message error:{}",groupId,message);removeUser(k);}});}}public static void batchSendMessage(String message) {sseEmitterMap.forEach((k,v)->{try{v.send(message,MediaType.APPLICATION_JSON);}catch (IOException e){log.error("user id:{}, send message error:{}",k,e.getMessage());removeUser(k);}});}/*** 群发消息*/public static void batchSendMessage(String message, Set<String> userIds){userIds.forEach(userid->sendMessage(userid,message));}//移除用户public static void removeUser(String userid){sseEmitterMap.remove(userid);//数量-1count.getAndDecrement();log.info("remove user id:{}",userid);}public static List<String> getIds(){return new ArrayList<>(sseEmitterMap.keySet());}public static int getUserCount(){return count.intValue();}private static Runnable completionCallBack(String userId) {return () -> {log.info("结束连接,{}",userId);removeUser(userId);};}private static Runnable timeoutCallBack(String userId){return ()->{log.info("连接超时,{}",userId);removeUser(userId);};}private static Consumer<Throwable> errorCallBack(String userId){return throwable -> {log.error("连接异常,{}",userId);removeUser(userId);};}
}
Controller测试代码
package com.project.test.controller;import com.hjl.test.util.SseEmitterServer;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.util.HashMap;
import java.util.Map;@RestController
@RequestMapping(value = "/test")
public class TestController {//sse连接接口@GetMapping (value = "/sse/connect/{id}")public SseEmitter connect(@PathVariable String id){return SseEmitterServer.connect(id);}//sse向指定用户发送消息接口@GetMapping (value = "/sse/send/{id}")public Map<String,Object> send(@PathVariable String id,@RequestParam(value = "message", required = false) String message){Map<String,Object> returnMap = new HashMap<>();//向指定用户发送信息SseEmitterServer.sendMessage(id,message);returnMap.put("message","向id为"+id+"的用户发送:"+message+"成功!");returnMap.put("status","200");returnMap.put("result",null);return returnMap;}//sse向所有已连接用户发送消息接口@GetMapping (value = "/sse/batchSend")public Map<String,Object> batchSend(@RequestParam(value = "message", required = false) String message){Map<String,Object> returnMap = new HashMap<>();//向指定用户发送信息SseEmitterServer.batchSendMessage(message);returnMap.put("message",message+"消息发送成功!");returnMap.put("status","200");returnMap.put("result",null);return returnMap;}//sse关闭接口@GetMapping (value = "/sse/close/{id}")public Map<String,Object> close(@PathVariable String id){Map<String,Object> returnMap = new HashMap<>();//移除idSseEmitterServer.removeUser(id);System.out.println("当前连接用户id:"+SseEmitterServer.getIds());returnMap.put("message","连接关闭成功!");returnMap.put("status","200");returnMap.put("result",null);return returnMap;}}
测试结果如下:
这里测试SSE连接,就像正常接口那样请求就行。
本地调用接口/sse/connect/1如下:
这里我连接2个用户,用来模拟向指定用户id发送信息和批量向已连接的用户发送消。
后端服务打印如下:
本地调用接口/sse/send/1如下:
用户1的结果如下,发现它收到了消息:
用户2没有收到结果,如下:
本地调用接口/sse/batchSend如下:
批量向所有已经连接的用户发送消息。
用户1结果如下,发现接收到了消息:
用户2结果如下,发现也接收到了消息:
测试结果都符合预期。
点击postman的close按钮,关闭连接:
发现前端连接虽然关闭了,但是后端实际还在连接中,根本没有移除用户的提示:
所以这里还需要自己手动写关闭接口测试。
本地调用接口/sse/close/1如下:
可以看到把用户id为1的给移除了,只剩用户2还在连接中。
这里所有测试完成,结果符合预期。
注意
将超时时间由原来的0改为默认的30秒,会报错。
测试结果如下:
这里直接出现了一个异常:org.springframework.web.context.request.async.AsyncRequestTimeoutException
甚至连接都断开了。
将springboot降为低版本如1.4.2.RELEASE。
使用postman进行测试的时候,发现它不是一直在请求中:如下:
将Springboot降为1.4.2.RELEASE
springboot的1.4.2.RELEASE版本没有onError方法,需要注释掉。
postman测试如下:
低版本测试的时候发现它有一个这个连接可以直接看到,而使用springboot版本2.x版本就发现它一直处于发送请求的状态,什么时候后端向前端发送了消息,它就显示这个。
springboot的1.4.2.RELEASE版本结果:
springboot的2.7.3版本结果:
这里先将这种情况先记录下来先,等后面有时间再研究。怎么高版本就不能向低版本那样返回这个连接信息呢?所以SpringBoot高版本使用SSE连接的时候一直处于Sending request这种情况,这种情况是正常的吗?有没有大佬告知下,谢谢。