vue+springboot+websocket实时聊天通讯功能

server/2025/1/15 20:59:31/

前言

在我的前一篇文章里

vue+springboot实现聊天功能

🎈🎈🎈🎈🎈🎈🎈

实现了最最基础的聊天功能,可以通过聊天互相给对方发送信息

🎈🎈🎈🎈🎈🎈🎈

那么上次的文章是通过定时任务定时的刷新聊天数据,实现监听聊天记录的,那么这里来做点不一样的东西

🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸🌸
接下来我会通过websocket,通过教学实例,教大家如何实现如下功能点

🌰 实时获取聊天数据
🌰 实时更新在线人员
🌰 实时更新未读消息数量

websocket_18">websocket

websocket_20">什么是websocket

WebSocket是一种在单个TCP连接上进行全双工通信的协议。它允许在客户端和服务器之间建立持久连接,从而实现实时的双向通信。相比传统的HTTP请求-响应模式,WebSocket提供了更高效、更实时的通信方式。

简单的说,就是通过服务端主动向客户端发送信息,也就是后端主动给前端发信息

websocket_25">websocket的优点

实时性: WebSocket允许服务器主动向客户端推送数据,而不需要客户端首先发送请求。这使得实时通信变得更加容易实现,例如在线聊天、实时游戏等。

减少延迟: 由于WebSocket建立了持久连接,避免了HTTP的每次请求-响应的开销,因此可以减少通信的延迟,使得应用程序的响应速度更快。

减少网络流量: WebSocket的头部信息相比于HTTP较小,且不需要在每次通信时重复发送,因此可以减少网络流量。

更少的资源消耗: WebSocket的连接一旦建立就可以持续存在,不需要像HTTP一样频繁地建立和关闭连接,这可以减少服务器和客户端的资源消耗。

更好的跨域支持: WebSocket协议支持跨域通信,使得在不同域名下的服务器和客户端之间进行实时通信更加方便。

🍒🍒🍒🍒🍒🍒🍒🍒🍒🍒

项目演示

项目展示图

在这里插入图片描述

同样在讲解如何实现之前,和大家演示一下我项目中实际的效果是什么样

在用户区,默认会抓取注册用户特定数量的用户数据,按照在线状态,是否和自己聊天进行展示顺序,如果有查人员会过滤只查特定用户名的数据,否则按照默认抓取

在这里插入图片描述
当输入用户名:
在这里插入图片描述

当有用户登录或者登出的时候更新人员在线状态

初始时候:
在这里插入图片描述
假如用户11111进行登出:
在这里插入图片描述
在这里插入图片描述
可以看到,该用户在线状态为关闭

然后再进行登录11111
在这里插入图片描述
在这里插入图片描述

假如我对用户进行发送聊天信息,在未打开聊天框的情况下:
在这里插入图片描述
发送信息之后:
在这里插入图片描述
会立刻同步信息

此时打开聊天框之后:
已读数量重置:
在这里插入图片描述

📌📌📌📌📌📌📌📌📌📌

以上为我项目中实现上述功能的演示,接下来我将讲解如何实现

前端

那么正式开始之前,我的表同前面的文章所写,就两张表,一张用户表,一张记录信息表

websocketjs_82">websocket.js

这里写一个获取websocket的工具类

// WebSocket地址
const url = 'ws://127.0.0.1:8082/sso/webSocket/'// WebSocket实例
let ws// 重连定时器实例
let reconnectTimer// WebSocket重连开关
let isReconnecting = false// WebSocket对象
const websocket = {// WebSocket建立连接Init (username) {// 判断浏览器是否支持WebSocketif (!('WebSocket' in window)) {console.log('浏览器不支持WebSocket')return}// 创建WebSocket实例ws = new WebSocket(url + username)// 监听WebSocket连接ws.onopen = () => {console.log('WebSocket连接成功')}// 监听WebSocket连接错误信息ws.onerror = (e) => {console.log('WebSocket重连开关', isReconnecting)console.log('WebSocket数据传输发生错误', e)// 打开重连reconnect()}// 监听WebSocket接收消息ws.onmessage = (e) => {console.log('WebSocket接收后端消息:', e)// 心跳消息不做处理if (e.data === 'ok') {return}// 调用回调函数处理接收到的消息if (websocket.onMessageCallback) {websocket.onMessageCallback(e.data)}}},// WebSocket连接关闭方法Close () {// 关闭断开重连机制isReconnecting = truews.close()// ElMessage.error('WebSocket断开连接')},// WebSocket发送信息方法Send (data) {// 处理发送数据JSON字符串const msg = JSON.stringify(data)// 发送消息给后端ws.send(msg)},// 暴露WebSocket实例,其他地方调用就调用这个getWebSocket () {return ws},// 新增回调函数用于处理收到的消息onMessageCallback: null,// 设置消息处理回调函数setMessageCallback (callback) {this.onMessageCallback = callback}
}// 监听窗口关闭事件,当窗口关闭时-每一个页面关闭都会触发-扩张需求业务
window.onbeforeunload = function () {// 在窗口关闭时关闭 WebSocket 连接websocket.Close()console.log('WebSocket窗口关闭事件触发')
}// 浏览器刷新重新连接
// 刷新页面后需要重连-并且是在登录之后
if (performance.getEntriesByType('navigation')[0].type === 'reload') {console.log('WebSocket浏览器刷新了')// 延迟一定时间再执行 WebSocket 初始化,确保页面完全加载后再进行连接setTimeout(() => {console.log('WebSocket执行刷新后重连...')// 刷新后重连// 获取登录用户idlet userId = ''websocket.Init(userId)}, 200) // 适当调整延迟时间
}// 重连方法
function reconnect () {// 判断是否主动关闭连接if (isReconnecting) {return}// 重连定时器-每次WebSocket错误方法onerror触发它都会触发reconnectTimer && clearTimeout(reconnectTimer)reconnectTimer = setTimeout(function () {console.log('WebSocket执行断线重连...')// 获取登录用户idlet userId = ''websocket.Init(userId)isReconnecting = false}, 4000)
}// 暴露对象
export default websocket

界面

<template><div class="chat-container"><!-- Left side: User list --><div class="left-side"><!-- Search input (moved outside) --><div class="search-wrapper"><el-input v-model="searchUserName" placeholder="回车搜索用户" class="search-input" @keydown.enter.native="searchUserForForm"></el-input></div><!-- User list (with scroll) --><el-scrollbar class="user-list-scroll"><el-row><el-col :span="24" v-for="form in messageForm" :key="form.sendUser.userId" @click.native="chooseUser(form.sendUser)" class="user-item" v-if="messageForm.length !== 0"><div class="user-avatar-wrapper"><div :class="{ 'online-dot': form.isOnline }"></div><!-- Element UI Badge for showing unread messages --><el-badge :value="form.noReadMessageLength" class="message-badge" v-if="form.noReadMessageLength > 0"><img :src="form.sendUser.avatar" class="user-avatar"></el-badge><img :src="form.sendUser.avatar" class="user-avatar" v-else></div><div class="user-details"><div class="user-name">{{ form.sendUser.userName }}</div><div class="user-last-message">{{ form.lastMessage }}&nbsp;</div></div></el-col></el-row></el-scrollbar></div><!-- Right side: Chat box --><div class="right-side"><!-- Chat header --><div class="chat-header"><span v-if="currentUser">{{ currentUser.userName }}</span></div><!-- Chat messages --><el-scrollbar class="chat-messages" ref="messageContainer"><div class="messageBox" v-for="message in messages" :key="message.handle" :class="{ ownMessage: message.sendUser === loginUser.userId, otherMessage: message.sendUser !== loginUser.userId }"><div><img :src="message.sendUser === loginUser.userId ? loginUser.avatar : currentUser.avatar" alt="" style="border: 1px solid #70c1fa;"></div><div class="messageContent">{{ message.content }}</div><div class="messageTime">{{ message.createTime.replace('T', ' ') }}</div></div></el-scrollbar><div class="chat-input"><el-input v-model="newMessage.content" placeholder="请输入聊天内容" autosize class="message-input" @keydown.enter.native="sendMessage"></el-input><el-button type="primary" @click.native="sendMessage" class="send-button">发送</el-button></div></div></div>
</template>

逻辑

<script>
// 获取用户信息和聊天信息后端接口,根据自己的实际项目修改
import {findMessageBySendUserAndReceiveUser, searchUserForForm} from '../api/message'
// 发送信息给指定userId的websocket
import {sendMessageTo} from '../api/webSocketApi'
// 根据userId获取用户信息
import {searchUserByUserId} from '../api/user'
// 刚刚写的websocket
import websocket from '../utils/webSocket'export default {data () {return {currentUser: null, // 当前聊天的人loginUser: null,messages: [],messageForm: [], // 聊天所有信息newMessage: {handle: '',sendUser: '',receiveUser: '',content: '',is_read: '0',createTime: ''},searchUserName: ''}},methods: {async fetchMessages (userId) {if (!userId) {this.searchUserForForm()return}if (this.loginUser.userId == null) {this.$message.error('登录用户编号获取失败,请重新登录!')return}findMessageBySendUserAndReceiveUser(userId, this.loginUser.userId ).then(async res => {await this.searchUserForForm()this.messages = res.value// 将聊天记录总下拉到最下方this.$nextTick(() => {this.scrollToBottom()})})},sendMessage () {if (!this.newMessage.content.trim()) {this.$message.warning('请输入聊天内容')return}this.newMessage.content = this.newMessage.content.trim()if (this.loginUser.userId == null) {this.$message.error('登录用户编号获取失败,请重新登录!')return}if (this.loginUser.userId  === this.currentUser.userId) {this.$message.error('不能给自己发送信息!')return}this.newMessage.sendUser = this.loginUser.userId this.newMessage.receiveUser = this.currentUser.userIdthis.sendWebSocketMessage(this.newMessage)sendMessageTo(this.newMessage).then(res => {if (res.header.code !== 0) {this.$message.error(res.header.message)return}this.chooseUser(this.currentUser)})},// 消息过多的时候滚动到最新消息位置scrollToBottom () {// 使用 $refs 来获取对消息容器的引用const container = this.$refs.messageContainer.$refs.wrap// 滚动到底部container.scrollTop = container.scrollHeight},checkAvatar (avatar) {if (avatar && avatar !== undefined) {return avatar}return ''},chooseUser (user) {this.currentUser = userthis.fetchMessages(user.userId)},// websocketasync connectWebSocket (userId) {await new Promise((resolve) => {websocket.Init(userId)// 在 WebSocket 连接成功时调用 resolvewebsocket.getWebSocket().onopen = () => {console.log('WebSocket连接成功')resolve()}})},// 发送信息sendWebSocketMessage (message) {websocket.getWebSocket().onmessage = (event) => {// 处理消息,这里你可以根据实际需求更新页面上的数据console.log('收到的消息WebSocket2:', event)// 更新收到的消息// receivedMessage.value = event.dataif (this.currentUser) {this.fetchMessages(this.currentUser.userId)} else {this.fetchMessages()}}},handleMessage (message) {// 内容进行相应的处理const parsedMessage = JSON.parse(message)console.log('收到信息:', parsedMessage)if (this.currentUser) {this.fetchMessages(this.currentUser.userId)} else {this.fetchMessages()}},// 获取所有信息searchUserForForm () {if (this.loginUser !== null) {searchUserForForm(this.loginUser.userId, this.searchUserName).then(res => {if (res.header.code !== 0) {this.$message.error(res.header.message)return}this.messageForm = res.value})}},// 初始化websocketasync connectWebSocket (userId) {await new Promise((resolve) => {websocket.Init(userId)// 在 WebSocket 连接成功时调用 resolvewebsocket.getWebSocket().onopen = () => {console.log('WebSocket连接成功')resolve()}})},},mounted () {websocket.setMessageCallback((res) => {this.handleMessage(res)})},created () {},beforeCreate () {// 获取登录用户userId,请根据自己实际项目获取let userId = ''this.connectWebSocket(userId)if (userId) {searchUserByUserId(userId).then(res => {if (res.header.code === 0) {if (res.value) {this.loginUser = res.value} else {this.loginUser.userId = userId}}}).then(() => {this.searchUserForForm()})}}
}
</script>

样式


<style scoped>
.chat-container {display: flex;height: 100%;background: linear-gradient(to bottom right, #FFFFFF, #ECEFF1);
}.left-side {position: relative; /* Position relative for absolute positioning */flex: 1;padding: 20px;border-right: 1px solid #eaeaea;border-radius: 10px;box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
}.search-input {position: absolute;top: 20px;left: 20px;width: calc(100% - 40px);max-width: 300px;
}.user-list-scroll {top: 40px;height: calc(100% - 40px);overflow-y: auto;
}.user-avatar-wrapper {position: relative;display: inline-block;
}.user-avatar {width: 40px;height: 40px;border-radius: 50%;margin-right: 10px;border: 1px solid #74ffd2;
}.user-name {font-weight: 800;white-space: nowrap; /* 不换行 */overflow: hidden; /* 溢出隐藏 */padding-left: 15px;text-overflow: ellipsis; /* 超出显示省略号 */text-align: left; /* 添加左对齐属性 */
}.user-last-message {color: #a19f9f;font-size: 14px;white-space: nowrap;overflow: hidden;padding-left: 15px;text-overflow: ellipsis;text-align: left; /* 添加左对齐属性 */
}.right-side {flex: 3;display: flex;flex-direction: column;
}.chat-header {padding: 20px;border-bottom: 1px solid #eaeaea;font-size: 1.2em;color: #37474F;
}.chat-messages {flex: 1;overflow-y: auto;padding: 20px;
}.chat-input {padding: 20px;display: flex;align-items: center;
}.message-input {flex: 1;margin-right: 10px;
}.send-button {flex-shrink: 0;
}.user-item {display: flex;align-items: center;padding: 10px;border-bottom: 1px solid #f0f0f2;
}.user-item:hover {background-color: #E0E0E0;cursor: pointer;transition: background-color 0.3s ease;
}.user-details {flex-grow: 1; /* 填充剩余空间 */
}.messageBox {display: flex;align-items: flex-start; /* 将头像和文本第一行对齐 */margin-bottom: 10px;
}.messageBox img {width: 40px; /* 调整头像大小 */height: 40px;border-radius: 50%;margin-right: 10px;margin-left: 10px;
}.messageContent {max-width: 70%; /* 调整发送信息宽度 */padding: 10px;border-radius: 8px;background-color: #f0f0f0;text-align: left; /* 文本左对齐 */word-wrap: break-word; /* 当文本过长时自动换行 */
}.messageTime {font-size: 12px;color: #999;margin-left: 10px;margin-top: 5px; /* 将发送时间与文本分隔开 */
}.ownMessage {flex-direction: row-reverse;align-items: flex-end; /* 将发送时间放置在最下方的贴右位置 */
}.otherMessage {flex-direction: row;align-items: flex-end; /* 将发送时间放置在最下方的贴左位置 */
}.online-dot {position: absolute;top: 0;left: 0;z-index: 1;width: 10px;height: 10px;background-color: #01c201;border-radius: 50%;
}
.message-badge .el-badge__content {position: absolute;bottom: 5px; /* Adjust to your desired position */left: 5px; /* Adjust to your desired position */background-color: #f56c6c; /* Red background for visibility */color: white; /* White text color */
}
</style>

✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨

以上主逻辑就OK了 ,其他的如axios调用后端接口的逻辑,等等省略

后端

首先我们现安装依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId><version>2.7.0</version> <!-- 可以根据你的Spring Boot版本调整这个版本号 -->
</dependency>

如上你的为springboot项目的话是可以省略版本的,自动查找对应版本

实体

对应我的用户实体为:

@Data
public class User {private String userId;private String avatar;private String userName;private String password;private String salt;private String email;private String phone;private String sex;private Integer age;private Integer status;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime createTime;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime updateTime;
}

消息实体:

@Getter
@Setter
@ToStringpublic class Message {private String handle;private String sendUser;private String receiveUser;private String content;private String isRead;private LocalDateTime createTime;
}

对应为了实现该功能而搭建的实体

import com.pearl.entitys.dataBaseTable.User;
import java.util.ArrayList;
import java.util.List;
import lombok.Data;@Data
public class MessageForm {// 发送用户和接收用户完整聊天消息列表private List<Message> messages = new ArrayList<>();// 未读消息数量private Integer noReadMessageLength;// 在线标识private Boolean isOnline;// 发送信息用户private User sendUser;// 接收信息用户,偏向于赋值登录人员用户信息private User receiveUser;// 最新一条聊天记录private String lastMessage;
}

websocket_720">websocket

import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson.JSONObject;
import com.pearl.entitys.beans.webSocket.WebSocket;
import com.pearl.entitys.dataBaseTable.Message;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;/*** @program: tools* @Description: 通过这个类进行连接WebSocket的,默认发信息就进入onMessage解析*/
@Component
@ServerEndpoint(value = "/webSocket/{userId}")
@Slf4j
public class WebSocketUtil {/*** 登录连接数 应该也是线程安全的*/private static int loginCount = 0;/*** user 线程安全的*/private static final Map<String, WebSocket> userMap = new ConcurrentHashMap<String, WebSocket>();/*** @Description: 收到消息触发事件,这个消息是连接人发送的消息* @Param [messageInfo, session]* @Return: void { "userId": "test2", "message": "你收到了嘛?这是用户test发的消息!" }**/@OnMessagepublic void onMessage(String messageInfo, Session session)throws IOException, InterruptedException {if (StringUtils.isBlank(messageInfo)) {return;}// 当前用户String userIdTo = session.getPathParameters().get("userId");// JSON数据log.info("onMessage:{}", messageInfo);System.out.println("接收信息:" + messageInfo + "," + userIdTo);
//    Map map = JSON.parseObject(messageInfo, Map.class);
//    // 接收人
//    String userId = (String) map.get("userId");
//    // 消息内容
//    String message = (String) map.get("message");// 发送给指定用户sendMessageTo(messageInfo, userIdTo);log.info(DateUtil.now() + " | " + userIdTo + " 私人消息-> " + messageInfo, userIdTo);}/*** @Description: 打开连接触发事件* @Param [account, session, config]* @Return: void**/@OnOpenpublic void onOpen(@PathParam("userId") String userId, Session session) throws IOException {WebSocket webSocket = new WebSocket();webSocket.setUserId(userId);webSocket.setSession(session);boolean containsKey = userMap.containsKey(userId);if (!containsKey) {// 添加登录用户数量addLoginCount();userMap.put(userId, webSocket);}log.info("打开连接触发事件!已连接用户: " + userId);log.info("当前在线人数: " + loginCount);Message message = new Message();message.setContent(userId + " 已连接");sendMessageAll(JSONObject.toJSONString(message));}/*** @Description: 关闭连接触发事件* @Param [session, closeReason]* @Return: void**/@OnClosepublic void onClose(@PathParam("userId") String userId, Session session,CloseReason closeReason) throws IOException {boolean containsKey = userMap.containsKey(userId);if (containsKey) {// 删除map中用户userMap.remove(userId);// 减少断开连接的用户reduceLoginCount();}log.info("关闭连接触发事件!已断开用户: " + userId);log.info("当前在线人数: " + loginCount);Message message = new Message();message.setContent(userId + " 已断开");sendMessageAll(JSONObject.toJSONString(message));}/*** @Description: 传输消息错误触发事件* @Param [error :错误]* @Return: void**/@OnErrorpublic void onError(Throwable error) {log.info("onError:{}", error.getMessage());}/*** @Description: 发送指定用户信息* @Param [message:信息, userId:用户]* @Return: void**/public void sendMessageTo(String message, String userId) throws IOException {for (WebSocket user : userMap.values()) {if (user.getUserId().equals(userId)) {System.out.println("用户:" + userId + "收到信息:" + message);user.getSession().getAsyncRemote().sendText(message);}}}/*** @Description: 发给所有人* @Param [message:信息]* @Return: void**/public void sendMessageAll(String message) throws IOException {for (WebSocket item : userMap.values()) {item.getSession().getAsyncRemote().sendText(message);}}/*** @Description: 连接登录数增加* @Param []* @Return: void**/public static synchronized void addLoginCount() {loginCount++;}/*** @Description: 连接登录数减少* @Param []* @Return: void**/public static synchronized void reduceLoginCount() {loginCount--;}/*** @Description: 获取用户* @Param []* @Return: java.util.Map<java.lang.String, com.cn.webSocket.WebSocket>**/public synchronized Map<String, WebSocket> getUsers() {return userMap;}}

消息controller

import com.pearl.responseEntity.Response;
import com.pearl.service.MessageService;
import com.pearl.utils.db.PrimeDB;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/message")
public class MessageController {
// 我的数据库@Autowiredprivate PrimeDB primeDB;
//逻辑层@Resourceprivate MessageService messageService;// 查找未读数量@GetMapping("/findNoReadMessageLength")public Response findNoReadMessage(@RequestParam("userId") String userId) {try (Connection conn = primeDB.create()) {Integer total = messageService.findNoReadMessageLength(conn, userId);return new Response(0, total, "查询成功!");} catch (Exception e) {return new Response(1, e.getMessage());}}// 查找两个人的聊天记录@GetMapping("/findMessageBySendUserAndReceiveUser")public Response<List<Message>> findMessageBySendUserAndReceiveUser(@RequestParam("sendUserId") String sendUserId,@RequestParam("receiveUserId") String receiveUserId) {try (Connection conn = primeDB.create()) {return new Response<>(0,messageService.findMessageBySendUserAndReceiveUser(conn, sendUserId, receiveUserId),"查找成功!");} catch (Exception e) {return new Response<>(1, e.getMessage());}}// 发送信息@PostMapping("/sendMessage")public Response sendMessage(@RequestBody Message message) {try (Connection conn = primeDB.create()) {messageService.sendMessage(conn, message);return new Response(0, "发送成功!");} catch (Exception e) {return new Response(1, e.getMessage());}}// 查找我的用户信息数据@GetMapping("searchUserForForm")public Response<List<MessageForm>> searchUserForForm(@RequestParam("loginUserId") String loginUserId,@RequestParam("searchUserName") String searchUserName) {try (Connection conn = primeDB.create()) {List<MessageForm> messages = messageService.searchUserForForm(conn,loginUserId, searchUserName);return new Response<>(0, messages, "查找成功!");} catch (Exception e) {return new Response<>(1, e.getMessage());}}
}

消息逻辑

import com.pearl.db.MessageDao;
import com.pearl.db.UserDao;
import com.pearl.entitys.beans.MessageForm;
import com.pearl.entitys.beans.webSocket.WebSocket;
import com.pearl.entitys.dataBaseTable.Message;
import com.pearl.entitys.dataBaseTable.User;
import com.pearl.utils.AssertUtils;
import com.pearl.utils.WebSocketUtil;
import com.pearl.utils.Where;
import java.sql.Connection;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageService {@Autowiredprivate WebSocketUtil webSocketUtil;// 限制聊天记录数量final Integer limitMessagesLength = 6000;// 限制用户数量final Integer limitUserLength = 300;// 获取未读的接收信息public Integer findNoReadMessageLength(Connection conn, String userId) throws Exception {try {Integer totol = 0;AssertUtils.isError(StringUtils.isEmpty(userId), "用户编号不能为空!");UserDao userDao = new UserDao(conn);MessageDao messageDao = new MessageDao(conn);User user = userDao.selectbyUserId(userId);AssertUtils.isError(null == user, "用户编号:" + userId + "不存在!");// 为防止发送人特别多导致信息未获取,这里多设置一些拿信息数据List<Message> messages = messageDao.selectByReceiveUserLimitLength(userId, limitMessagesLength);Map<String, List<Message>> messageBySendUserMap = messages.stream().collect(Collectors.groupingBy(Message::getSendUser));for (String sendUser : messageBySendUserMap.keySet()) {List<Message> receiveMessageList = messageBySendUserMap.get(sendUser);Integer noReadSize = receiveMessageList.stream().filter(o -> "0".equals(o.getIsRead())).collect(Collectors.toList()).size();totol += noReadSize;}return totol;} catch (Exception e) {throw new Exception(e.getMessage());}}// 发送信息的逻辑public void sendMessage(Connection conn, Message message) throws Exception {try {AssertUtils.isError(StringUtils.isEmpty(message.getSendUser()), "发送用户不能为空!");AssertUtils.isError(StringUtils.isEmpty(message.getReceiveUser()), "接收用户不能为空!");AssertUtils.isError(StringUtils.isEmpty(message.getContent()), "发送信息不能为空!");UserDao userDao = new UserDao(conn);MessageDao messageDao = new MessageDao(conn);User sendUser = userDao.selectbyUserId(message.getSendUser());AssertUtils.isError(null == sendUser, "发送用户不存在,发送信息失败!");AssertUtils.isError(sendUser.getStatus() != 1,"发送用户:" + message.getSendUser() + "状态已冻结,无法发送信息!");User receiveUser = userDao.selectbyUserId(message.getReceiveUser());AssertUtils.isError(null == receiveUser, "接收用户不存在,发送信息失败!");AssertUtils.isError(receiveUser.getStatus() != 1,"接收用户:" + message.getReceiveUser() + "状态已冻结,无法接收信息!");message.setHandle(UUID.randomUUID().toString());message.setIsRead("0");message.setCreateTime(LocalDateTime.now());messageDao.insert(message);} catch (Exception e) {throw new Exception(e.getMessage());}}// 获取两个人的聊天记录public List<Message> findMessageBySendUserAndReceiveUser(Connection conn, String sendUserId,String receiveUserId) throws Exception {try {AssertUtils.isError(StringUtils.isEmpty(sendUserId), "发送用户为空!");AssertUtils.isError(StringUtils.isEmpty(receiveUserId), "接收用户为空!");UserDao userDao = new UserDao(conn);User sendUser = userDao.selectbyUserId(sendUserId);AssertUtils.isError(null == sendUser, "发送用户不存在,发送信息失败!");User receiveUser = userDao.selectbyUserId(receiveUserId);AssertUtils.isError(null == receiveUser, "接收用户不存在,发送信息失败!");MessageDao messageDao = new MessageDao(conn);// 获取对方发送的信息List<Message> receiveMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(sendUserId,receiveUserId, limitMessagesLength);// 获取发送给对方的信息List<Message> sendMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(receiveUserId,sendUserId, limitMessagesLength);List<Message> allMessageList = new ArrayList<>();allMessageList.addAll(receiveMessageList);allMessageList.addAll(sendMessageList);List<Message> sortedMessageList = allMessageList.stream().sorted(Comparator.comparing(Message::getCreateTime)).collect(Collectors.toList());// 设置已读List<Message> noReadMessageList = receiveMessageList.stream().filter(o -> "0".equals(o.getIsRead())).peek(message -> message.setIsRead("1")).collect(Collectors.toList());if (noReadMessageList.size() > 0) {messageDao.update(noReadMessageList);}return sortedMessageList;} catch (Exception e) {throw new Exception(e.getMessage());}}// 获取所有数据public List<MessageForm> findAllMessageForm(Connection conn, String userId) throws Exception {try {Map<String, WebSocket> users = webSocketUtil.getUsers();Set<String> ids = users.keySet();AssertUtils.isError(StringUtils.isEmpty(userId), "用户编号不能为空!");List<MessageForm> messageFormList = new ArrayList<>();UserDao userDao = new UserDao(conn);MessageDao messageDao = new MessageDao(conn);User loginUser = userDao.selectbyUserId(userId);AssertUtils.isError(null == loginUser, "用户编号:" + userId + "不存在!");messageFormList.addAll(findAllMessageChatDataWithLoginUserId(conn, userId));// 判断ids是否在messageFormList的sendUser的Id中,不是则获取新的数据到messageFormListfor (String id : ids) {if (!messageFormList.stream().map(o -> o.getSendUser().getUserId()).collect(Collectors.toList()).contains(id)) {MessageForm messageForm = new MessageForm();User sendUserData = userDao.selectbyUserId(id);if (null == sendUserData) {continue;}List<Message> allMessageList = findBothMessages(userId, id,limitMessagesLength, messageDao);messageForm.setMessages(allMessageList);messageForm.setSendUser(sendUserData);messageForm.setReceiveUser(loginUser);messageForm.setIsOnline(true);messageForm.setNoReadMessageLength(0);messageForm.setLastMessage("");messageFormList.add(messageForm);}}// 获取所有messageFormList的sendUser的userId,List<String> sendUserIds = messageFormList.stream().map(MessageForm::getSendUser).map(User::getUserId).collect(Collectors.toList());// 如果获取到的用户少于100个,则补齐剩余数量的用户数据,这里补齐的是从没和自己聊天过的用户if (sendUserIds.size() < limitUserLength) {Integer leaveCount = limitUserLength - sendUserIds.size();Where where = new Where();where.notIn("user_id", sendUserIds.toArray());where.limit(leaveCount);List<User> userList = userDao.selectByWhere(where);if (null != userList && userList.size() > 0) {for (User user : userList) {MessageForm messageForm = new MessageForm();messageForm.setSendUser(user);messageForm.setReceiveUser(loginUser);messageForm.setIsOnline(false);messageForm.setNoReadMessageLength(0);messageForm.setLastMessage("");messageFormList.add(messageForm);}}}// 按照在线状态为true,有聊天记录的优先展示messageFormList.sort((o1, o2) -> {if (o1.getIsOnline() && o2.getIsOnline()) {return o2.getMessages().size() - o1.getMessages().size();} else if (o1.getIsOnline()) {return -1;} else if (o2.getIsOnline()) {return 1;} else {return o2.getMessages().size() - o1.getMessages().size();}});return messageFormList;} catch (Exception e) {throw new Exception(e.getMessage());}}// 获取登录用户所有聊过天的记录数据public List<MessageForm> findAllMessageChatDataWithLoginUserId(Connection conn, String userId)throws Exception {try {Map<String, WebSocket> users = webSocketUtil.getUsers();Set<String> ids = users.keySet();AssertUtils.isError(StringUtils.isEmpty(userId), "用户编号不能为空!");List<MessageForm> messageFormList = new ArrayList<>();UserDao userDao = new UserDao(conn);MessageDao messageDao = new MessageDao(conn);User loginUser = userDao.selectbyUserId(userId);AssertUtils.isError(null == loginUser, "用户编号:" + userId + "不存在!");// 获取所有有发送信息给自己聊天的用户List<String> allSendUsers = messageDao.selectByReceiveUser(userId).stream().map(Message::getSendUser).distinct().collect(Collectors.toList());for (String sendUser : allSendUsers) {// 发送人的用户信息MessageForm messageForm = new MessageForm();User sendUserData = userDao.selectbyUserId(sendUser);if (null == sendUserData) {continue;}// 获取对方发送的信息List<Message> receiveMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(sendUser,userId, limitMessagesLength);// 获取发送给对方的信息List<Message> sendMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(userId,sendUser, limitMessagesLength);List<Message> allMessageList = new ArrayList<>();allMessageList.addAll(receiveMessageList);allMessageList.addAll(sendMessageList);List<Message> sortedMessageList = allMessageList.stream().sorted(Comparator.comparing(Message::getCreateTime)).collect(Collectors.toList());// 赋值最新消息messageForm.setLastMessage(sortedMessageList.size() > 0 ? sortedMessageList.get(sortedMessageList.size() - 1).getContent() : "");// 赋值聊天记录messageForm.setMessages(sortedMessageList);// 赋值未读消息messageForm.setNoReadMessageLength(receiveMessageList.stream().filter(o -> "0".equals(o.getIsRead())).collect(Collectors.toList()).size());// 赋值发送人messageForm.setSendUser(sendUserData);// 赋值接收人messageForm.setReceiveUser(loginUser);// 赋值是否在线messageForm.setIsOnline(ids.contains(sendUser));messageFormList.add(messageForm);}// 获取只有自己发送信息给别人的记录的用户List<String> allSendToUsers = messageDao.selectBySendUser(userId).stream().map(Message::getReceiveUser).distinct().collect(Collectors.toList());for (String receiveUser : allSendToUsers) {// 判断messageFormList的sendUser的userId是否包含receiveUser,有则说明已经存在了if (messageFormList.stream().anyMatch(o -> o.getSendUser().getUserId().equals(receiveUser))) {continue;}MessageForm messageForm = new MessageForm();User receiveUserData = userDao.selectbyUserId(receiveUser);if (null == receiveUserData) {continue;}messageForm.setReceiveUser(loginUser);messageForm.setSendUser(receiveUserData);messageForm.setLastMessage("");messageForm.setNoReadMessageLength(0);List<Message> sendMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(userId,receiveUser, limitMessagesLength);// 按照CreateTime排序从小到大messageForm.setMessages(sendMessageList.stream().sorted(Comparator.comparing(Message::getCreateTime)).collect(Collectors.toList()));messageForm.setIsOnline(ids.contains(receiveUser));messageFormList.add(messageForm);}return messageFormList;} catch (Exception e) {throw new Exception(e.getMessage());}}// 用户区查到的数据,有用户名就查用户名对应用户数据和聊天记录,没有默认查300个用户(配置)public List<MessageForm> searchUserForForm(Connection conn, String userId, String username)throws Exception {try {List<MessageForm> messageFormList = new ArrayList<>();AssertUtils.isError(StringUtils.isEmpty(userId), "登录用户不能为空!");UserDao userDao = new UserDao(conn);MessageDao messageDao = new MessageDao(conn);User loginUser = userDao.selectbyUserId(userId);AssertUtils.isError(null == loginUser, "登录用户不存在!");if (StringUtils.isNotEmpty(username)) {List<User> userList = userDao.selectByUserName(username);Map<String, WebSocket> users = webSocketUtil.getUsers();Set<String> ids = users.keySet();for (User user : userList) {User sendUserData = userDao.selectbyUserId(user.getUserId());if (null == sendUserData) {continue;}MessageForm messageForm = new MessageForm();messageForm.setSendUser(sendUserData);messageForm.setReceiveUser(loginUser);messageForm.setIsOnline(ids.contains(user.getUserId()));// 获取对方发送的信息List<Message> receiveMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(sendUserData.getUserId(), loginUser.getUserId(), limitMessagesLength);// 获取发送给对方的信息List<Message> sendMessageList = messageDao.selectBySendUserAndReceiveUserLimitLength(loginUser.getUserId(), sendUserData.getUserId(), limitMessagesLength);List<Message> allMessages = new ArrayList<>();allMessages.addAll(receiveMessageList);allMessages.addAll(sendMessageList);allMessages.sort((o1, o2) -> {if (o1.getCreateTime().isBefore(o2.getCreateTime())) {return -1;} else if (o1.getCreateTime().isAfter(o2.getCreateTime())) {return 1;} else {return 0;}});if (allMessages.size() > 0) {messageForm.setLastMessage(allMessages.get(allMessages.size() - 1).getContent());} else {messageForm.setLastMessage("");}messageForm.setMessages(allMessages);// 获取allMessages中isRead为0的数量messageForm.setNoReadMessageLength((int) allMessages.stream().filter(message -> "0".equals(message.getIsRead())).count());messageFormList.add(messageForm);}} else {messageFormList.addAll(findAllMessageForm(conn, userId));}return messageFormList;} catch (Exception e) {throw new Exception(e.getMessage());}}
}

结语

以上为我用websocket实现聊天通讯的功能逻辑


http://www.ppmy.cn/server/11304.html

相关文章

Linux RHCE练习之远程连接服务实战

Linux RHCE练习之远程连接服务实战 要求 主机一 主机名&#xff1a;server.example.comip: 172.25.254.100建立用户timinglee&#xff0c;其密码为timinglee 主机二 主机名&#xff1a;client.example.comip: 172.25.254.200 实现 主机一实现 [rootserver100 ~]# hostn…

Pytorch 之torch.nn初探 卷积--Convolution Layers

任务描述 本关任务&#xff1a; 本关提供了一个Variable 类型的变量input&#xff0c;按照要求创建一 Conv1d变量conv&#xff0c;对input应用卷积操作并赋值给变量 output&#xff0c;并输出output 的大小。 相关知识 卷积的本质就是用卷积核的参数来提取原始数据的特征&a…

Go 之为什么 rune 是 int32 的别名而不是 uint32 的别名

我对这个问题其实也是一直有疑问的&#xff0c;毕竟像 byte 都是 uint8 的别名。然后找了一些问答资料&#xff0c;不知道还没有没其他更好的解释。 范围足够 在 Unicode 字符集中&#xff0c;一个字符的码点范围是从 U0000 到 U10FFFF&#xff0c;共计 1114112 个码点&#…

代码随想录算法训练营第三十四天|1005.K次取反后最大化的数组和,134. 加油站,135. 分发糖果

目录 1005.K次取反后最大化的数组和思路代码 134. 加油站思路代码 135. 分发糖果思路代码 1005.K次取反后最大化的数组和 题目链接&#xff1a;1005.K次取反后最大化的数组和 文档讲解&#xff1a;代码随想录 视频讲解&#xff1a;贪心算法&#xff0c;这不就是常识&#xff1f…

比特币中的符文是什么?

比特币中的符文是什么&#xff1f; 比特币符文是存在于比特币区块链上的独特的、可替代的代币。它们旨在代表具有独特特征和元数据的可替代资产。 Ordinals 协议的创建者 Casey Rodamor 最近放弃了一项替代 BRC-20 可替代代币协议的提案&#xff0c;该替代方案被称为 Runes。 破…

Visual Studio C++ 示例

Visual Studio C 示例 项目2023/06/163 个参与者 反馈 本文内容 GitHub 上的存档 C 示例ATL 示例CLR 和语言示例 - Windows 窗体COM 事件示例 显示另外 13 个 Visual Studio C 示例可在 Web 上找到。 Microsoft 已生成许多 C 示例&#xff0c;这些示例演示了跨多种技术的…

介绍TCP窗口

在TCP通信中&#xff0c;TCP窗口是用于控制发送方发送数据的速率的机制之一。TCP窗口大小会根据网络情况和接收方的处理能力进行动态调整&#xff0c;以最大化网络吞吐量并减少拥塞和丢包的风险。 当发送方以较快速度发送TCP数据包时&#xff0c;TCP窗口大小可能会自动调整&am…

吴恩达深度学习笔记:深度学习的 实践层面 (Practical aspects of Deep Learning)1.6-1.8

目录 第一门课&#xff1a;第二门课 改善深层神经网络&#xff1a;超参数调试、正 则 化 以 及 优 化 (Improving Deep Neural Networks:Hyperparameter tuning, Regularization and Optimization)第一周&#xff1a;深度学习的 实践层面 (Practical aspects of Deep Learning)…