TeamTalk消息服务器(未读计数)

devtools/2024/10/21 4:09:33/

信令和协议设计

enum MessageCmdID {// ...... 省略无关逻辑 CID_MSG_UNREAD_CNT_REQUEST = 775,CID_MSG_UNREAD_CNT_RESPONSE = 776,// ...... 省略无关逻辑 
};message IMUnreadMsgCntReq{//cmd id:		0x0307required uint32 user_id = 1;optional bytes attach_data = 20;	
}message IMUnreadMsgCntRsp{//cmd id:		0x0308required uint32 user_id = 1;required uint32 total_cnt = 2; // 多个人的未读消息repeated IM.BaseDefine.UnreadInfo unreadinfo_list = 3;optional bytes attach_data = 20;
}message UnreadInfo{required uint32 session_id = 1; // 会话IDrequired SessionType session_type = 2; // 会话类型required uint32 unread_cnt = 3; // 未读消息数量required uint32 latest_msg_id = 4; // 最新的消息IDrequired bytes latest_msg_data = 5; // 最新的消息required MsgType latest_msg_type = 6;  // 消息类型required uint32 latest_msg_from_user_id = 7;  //发送的用户id
}

流程图:

请添加图片描述

代码分析

msg_server收到CID_MSG_UNREAD_CNT_REQUEST后调用 CMsgConn::_HandleClientUnreadMsgCntRequest 函数

void CMsgConn::HandlePdu(CImPdu* pPdu)
{// ...... 省略无关逻辑 switch (pPdu->GetCommandId()) {// ...... 省略无关逻辑          case CID_MSG_UNREAD_CNT_REQUEST:_HandleClientUnreadMsgCntRequest(pPdu );break;              // ...... 省略无关逻辑 }
}void CMsgConn::_HandleClientUnreadMsgCntRequest(CImPdu* pPdu)
{log("HandleClientUnreadMsgCntReq, from_id=%u ", GetUserId());IM::Message::IMUnreadMsgCntReq msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));CDBServConn* pDBConn = get_db_serv_conn_for_login();if (pDBConn) {CDbAttachData attach(ATTACH_TYPE_HANDLE, m_handle, 0);msg.set_user_id(GetUserId());msg.set_attach_data(attach.GetBuffer(), attach.GetLength());pPdu->SetPBMsg(&msg);pDBConn->SendPdu(pPdu);}
}

db_proxy_server收到CID_MSG_UNREAD_CNT_REQUEST后调用 DB_PROXY::getUnreadMsgCounter函数

值得注意的是,返回的未读消息里面包含每个会话的未读消息个数,消息类型,最后一条消息。

m_handler_map.insert(make_pair(uint32_t(CID_MSG_UNREAD_CNT_REQUEST), DB_PROXY::getUnreadMsgCounter));void getUnreadMsgCounter(CImPdu* pPdu, uint32_t conn_uuid)
{IM::Message::IMUnreadMsgCntReq msg;IM::Message::IMUnreadMsgCntRsp msgResp;if(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength())){CImPdu* pPduResp = new CImPdu;uint32_t nUserId = msg.user_id();list<IM::BaseDefine::UnreadInfo> lsUnreadCount;uint32_t nTotalCnt = 0;// 从redis获取未读消息数量 和 从mysql获取最后一条未读消息CMessageModel::getInstance()->getUnreadMsgCount(nUserId, nTotalCnt, lsUnreadCount);CGroupMessageModel::getInstance()->getUnreadMsgCount(nUserId, nTotalCnt, lsUnreadCount);msgResp.set_user_id(nUserId);msgResp.set_total_cnt(nTotalCnt);for(auto it= lsUnreadCount.begin(); it!=lsUnreadCount.end(); ++it){IM::BaseDefine::UnreadInfo* pInfo = msgResp.add_unreadinfo_list();pInfo->set_session_id(it->session_id());pInfo->set_session_type(it->session_type());pInfo->set_unread_cnt(it->unread_cnt());pInfo->set_latest_msg_id(it->latest_msg_id());pInfo->set_latest_msg_data(it->latest_msg_data());pInfo->set_latest_msg_type(it->latest_msg_type());pInfo->set_latest_msg_from_user_id(it->latest_msg_from_user_id());}log("userId=%d, unreadCnt=%u, totalCount=%u", nUserId, msgResp.unreadinfo_list_size(), nTotalCnt);msgResp.set_attach_data(msg.attach_data());pPduResp->SetPBMsg(&msgResp);pPduResp->SetSeqNum(pPdu->GetSeqNum());pPduResp->SetServiceId(IM::BaseDefine::SID_MSG);pPduResp->SetCommandId(IM::BaseDefine::CID_MSG_UNREAD_CNT_RESPONSE);CProxyConn::AddResponsePdu(conn_uuid, pPduResp);}else{log("parse pb failed");}
}
void CMessageModel::getUnreadMsgCount(uint32_t nUserId, uint32_t &nTotalCnt, list<IM::BaseDefine::UnreadInfo>& lsUnreadCount)
{// redisCacheManager* pCacheManager = CacheManager::getInstance();CacheConn* pCacheConn = pCacheManager->GetCacheConn("unread");if (pCacheConn){map<string, string> mapUnread;string strKey = "unread_" + int2string(nUserId);bool bRet = pCacheConn->hgetAll(strKey, mapUnread);pCacheManager->RelCacheConn(pCacheConn);if(bRet){IM::BaseDefine::UnreadInfo cUnreadInfo;for (auto it = mapUnread.begin(); it != mapUnread.end(); it++) {cUnreadInfo.set_session_id(atoi(it->first.c_str()));cUnreadInfo.set_unread_cnt(atoi(it->second.c_str()));cUnreadInfo.set_session_type(IM::BaseDefine::SESSION_TYPE_SINGLE);uint32_t nMsgId = 0;string strMsgData;IM::BaseDefine::MsgType nMsgType;// 从mysql获取最后一条未读消息 mysqlgetLastMsg(cUnreadInfo.session_id(), nUserId, nMsgId, strMsgData, nMsgType); if(IM::BaseDefine::MsgType_IsValid(nMsgType)){cUnreadInfo.set_latest_msg_id(nMsgId);cUnreadInfo.set_latest_msg_data(strMsgData);cUnreadInfo.set_latest_msg_type(nMsgType);cUnreadInfo.set_latest_msg_from_user_id(cUnreadInfo.session_id());lsUnreadCount.push_back(cUnreadInfo);nTotalCnt += cUnreadInfo.unread_cnt();}else{log("invalid msgType. userId=%u, peerId=%u, msgType=%u", nUserId, cUnreadInfo.session_id(), nMsgType);}}}else{log("hgetall %s failed!", strKey.c_str());}}else{log("no cache connection for unread");}
}
void CMessageModel::getLastMsg(uint32_t nFromId, uint32_t nToId, uint32_t& nMsgId, string& strMsgData, IM::BaseDefine::MsgType& nMsgType, uint32_t nStatus)
{uint32_t nRelateId = CRelationModel::getInstance()->getRelationId(nFromId, nToId, false);if (nRelateId != INVALID_VALUE){CDBManager* pDBManager = CDBManager::getInstance();// 读从库CDBConn* pDBConn = pDBManager->GetDBConn("teamtalk_slave");if (pDBConn){string strTableName = "IMMessage_" + int2string(nRelateId % 8);string strSql = "select msgId,type,content from " + strTableName + " force index (idx_relateId_status_created) where relateId= " + int2string(nRelateId) + " and status = 0 order by created desc, id desc limit 1";CResultSet* pResultSet = pDBConn->ExecuteQuery(strSql.c_str());if (pResultSet){while (pResultSet->Next()){nMsgId = pResultSet->GetInt("msgId");nMsgType = IM::BaseDefine::MsgType(pResultSet->GetInt("type"));if (nMsgType == IM::BaseDefine::MSG_TYPE_SINGLE_AUDIO){// "[语音]"加密后的字符串strMsgData = strAudioEnc;}else{strMsgData = pResultSet->GetString("content");}}delete pResultSet;}else{log("no result set: %s", strSql.c_str());}pDBManager->RelDBConn(pDBConn);}else{log("no db connection_slave");}}else{log("no relation between %lu and %lu", nFromId, nToId);}
}

db_proxy_server回复信令CID_MSG_UNREAD_CNT_RESPONSE给msg_server,调用CDBServConn::_HandleUnreadMsgCountResponse

void CDBServConn::_HandleUnreadMsgCountResponse(CImPdu* pPdu)
{IM::Message::IMUnreadMsgCntRsp msg;CHECK_PB_PARSE_MSG(msg.ParseFromArray(pPdu->GetBodyData(), pPdu->GetBodyLength()));uint32_t user_id = msg.user_id();uint32_t total_cnt = msg.total_cnt();uint32_t user_unread_cnt = msg.unreadinfo_list_size();CDbAttachData attach_data((uchar_t*)msg.attach_data().c_str(), msg.attach_data().length());uint32_t handle = attach_data.GetHandle();log("HandleUnreadMsgCntResp, userId=%u, total_cnt=%u, user_unread_cnt=%u.", user_id,total_cnt, user_unread_cnt);CMsgConn* pMsgConn = CImUserManager::GetInstance()->GetMsgConnByHandle(user_id, handle);if (pMsgConn && pMsgConn->IsOpen()) {msg.clear_attach_data();pPdu->SetPBMsg(&msg);pMsgConn->SendPdu(pPdu);}
}

http://www.ppmy.cn/devtools/103953.html

相关文章

【ACM出版,EIScopus快检索-高录用】2024年数字经济与计算机科学国际学术会议(DECS2024,9月20-22)

欢迎参加2024年数字经济与计算机科学国际学术会议&#xff08;DECS2024&#xff09;&#xff0c;本次大会得到了马来西亚理工大学、北京科技大学经济管理学院、南京信息工程大学、马来西亚敦胡先翁大学的大力支持&#xff01; 旨在汇聚全球在数字经济与计算机科学领域内的研究者…

Web-gpt

AJAX AJAX&#xff08;Asynchronous JavaScript and XML&#xff0c;异步JavaScript和XML&#xff09;是一种用于创建动态网页应用的技术。它允许网页在不重新加载整个页面的情况下&#xff0c;异步地从服务器请求数据&#xff0c;并将这些数据更新到网页上。这提高了用户体验…

三天速成数学建模国赛国奖全攻略

这里写目录标题 国赛考点&#x1f5d2;️&#x1f5d2;️01 国赛是如何评奖的&#xff1f;02 国赛历年题型和模型算法1&#xff09;国赛赛题特点2&#xff09;历年国赛赛题类型 建模手三天快速提升计划✨✨01 第一天&#xff1a;模型分类及国赛常见模型的用法了解1&#xff09;…

MagiskBoot编译解包打包boot.img

版权归作者所有&#xff0c;如有转发&#xff0c;请注明文章出处&#xff1a;https://cyrus-studio.github.io/blog/ 编译环境准备 1. Windows下启用开发者模式&#xff0c;因为需要 symbolic link 支持 2. 安装 python3.8&#xff0c;并配置PATH环境变量 # 查看python版本信…

华为HCIA考试大纲

数据通信与网络基础 ● 数据通信网络基础 ■ 数据通信基础概念 ■ 信息传递的过程 ■ 网络设备及基本功能 ■ 网络类型及拓扑类型 ■ 网络工程 ■ 网络工程师 ● 网络参考模型 ■ 数据及…

解锁.NET安全奥秘:敏感数据加密与哈希的深度揭秘

在 .NET 应用中保护敏感数据&#xff1a;加密与哈希的深入探讨 随着数字化时代的不断发展&#xff0c;数据安全已经成为企业和开发者面临的首要挑战之一。在 .NET 应用程序中&#xff0c;保护敏感数据不被未授权访问、篡改或泄露至关重要。为此&#xff0c;加密与哈希技术被广…

大数据5v特性、集群、分布式

目录 数据分析六部曲 大数据的特点 &#xff08;5v特征&#xff09; 分布式与集群的区别 常用的分布式方案 数据分析六部曲 明确分析目的和思路&#xff1a;确保分析框架的体系化和逻辑性&#xff0c;简单来说就是先分析什么&#xff0c;后分析什么&#xff0c;使得各个分析…

Java项目中的分库分表实践指南

摘要 随着互联网应用的快速发展&#xff0c;单一数据库实例越来越难以满足高并发和大数据量的需求。分库分表是一种有效的解决方案&#xff0c;它通过将数据分散存储到不同的数据库或表中来提高系统的扩展性和性能。本文将详细介绍Java项目中实现分库分表的策略、步骤和最佳实…