概述
主要功能
存储消息(按照不同消息类型进行划分)
- 消息元信息存储到mysql数据库中,主要用于获取最近消息以及获取指定时间段的消息
- 文本消息的元信息存储到ES搜索引擎中,可以进行关键字的消息搜索
- 图片、语音、文件消息都通过文件存储子服务进行存储
获取消息(向外提供接口)
- 获取最近消息
- 获取指定时间段的消息
- 搜索包含有指定关键字的消息
功能模块
- 参数解析、日志输出、服务注册、RPC服务器
- 数据管理
- mysql数据管理模块,用来存储每一条消息的元信息
- ES数据管理模块,存储文本消息的元信息及其内容
- 数据管理服务,基于文件子服务进行数据管理
基本逻辑
数据管理实现
mysql持久化管理
实现逻辑
- ODB映射代码实现数据类映射数据库中的数据存储表
- 业务接口
- 获取指定会话最近N条消息
- 获取指定会话的时间段消息
具体实现
通过ODB生成数据表
#pragma once
#include <string>
#include <cstddef>
#include <odb/nullable.hxx>
#include <odb/core.hxx>
#include <boost/date_time/posix_time/posix_time.hpp>namespace mag
{
#pragma db object table("message")
class Message {public:Message(){}Message(const std::string &mid,const std::string &ssid,const std::string &uid,const unsigned char mtype,const boost::posix_time::ptime &ctime):_message_id(mid), _session_id(ssid),_user_id(uid), _message_type(mtype),_create_time(ctime){}/// 访问和修改变量方法 std::string message_id() const { return _message_id; }void message_id(const std::string &val) { _message_id = val; }std::string session_id() const { return _session_id; }void session_id(const std::string &val) { _session_id = val; }std::string user_id() const { return _user_id; }void user_id(const std::string &val) { _user_id = val; }unsigned char message_type() const { return _message_type; }void message_type(unsigned char val) { _message_type = val; }boost::posix_time::ptime create_time() const { return _create_time; }void create_time(const boost::posix_time::ptime &val) { _create_time = val; }std::string content() const { if (!_content) return std::string();return *_content; }void content(const std::string &val) { _content = val; }std::string file_id() const { if (!_file_id) return std::string();return *_file_id; }void file_id(const std::string &val) { _file_id = val; }std::string file_name() const { if (!_file_name) return std::string();return *_file_name; }void file_name(const std::string &val) { _file_name = val; }unsigned int file_size() const { if (!_file_size) return 0;return *_file_size; }void file_size(unsigned int val) { _file_size = val; }private:friend class odb::access;#pragma db id autounsigned long _id;#pragma db type("varchar(64)") index uniquestd::string _message_id;#pragma db type("varchar(64)") indexstd::string _session_id;std::string _user_id;unsigned char _message_type;#pragma db type("TIMESTAMP")boost::posix_time::ptime _create_time;odb::nullable<std::string> _content;#pragma db type("varchar(64)")odb::nullable<std::string> _file_id;#pragma db type("varchar(128)")odb::nullable<std::string> _file_name;odb::nullable<unsigned int> _file_size;
};
}
集成mysql相关操作
/// Message对象的数据库操作#include "mysql.hpp"
#include "message.hxx"
#include "message-odb.hxx"namespace mag
{
class MessageTable{public:using ptr = std::shared_ptr<MessageTable>;MessageTable(const std::shared_ptr<odb::core::database> &db): _db(db){}~MessageTable(){}//////插入消息//bool insert(Message &msg){try {odb::transaction trans(_db->begin());_db->persist(msg);trans.commit();}catch (std::exception &e) {LOG_ERROR("新增消息失败 {}:{}!", msg.message_id(),e.what());return false;}return true;}///删除指定会话的所有消息bool remove(const std::string &ssid){try{odb::transaction trans(_db->begin());typedef odb::query<Message> query;typedef odb::result<Message> result;_db->erase_query<Message>(query::session_id == ssid);trans.commit();}catch (std::exception &e) {LOG_ERROR("删除会话所有消息失败 {}:{}!", ssid, e.what());return false;}return true;}///获取指定会话的最近消息std::vector<Message> recent(const std::string &ssid, int count) {std::vector<Message> res;try {odb::transaction trans(_db->begin());typedef odb::query<Message> query;typedef odb::result<Message> result;std::stringstream cond;cond << "session_id='" << ssid << "' ";cond << "order by create_time desc limit " << count;result r(_db->query<Message>(cond.str()));for (result::iterator i(r.begin()); i != r.end(); ++i) {res.push_back(*i);}std::reverse(res.begin(), res.end());trans.commit();}catch (std::exception &e) {LOG_ERROR("获取最近消息失败:{}-{}-{}!", ssid, count, e.what());}return res;}///获取指定时间区间内的消息std::vector<Message> range(const std::string &ssid, boost::posix_time::ptime &stime, boost::posix_time::ptime &etime) {std::vector<Message> res;try {odb::transaction trans(_db->begin());typedef odb::query<Message> query;typedef odb::result<Message> result;result r(_db->query<Message>(query::session_id == ssid && query::create_time >= stime &&query::create_time <= etime));for (result::iterator i(r.begin()); i != r.end(); ++i) {res.push_back(*i);}trans.commit();}catch (std::exception &e) {LOG_ERROR("获取区间消息失败:{}-{}:{}-{}!", ssid, boost::posix_time::to_simple_string(stime), boost::posix_time::to_simple_string(etime), e.what());}return res;}private:std::shared_ptr<odb::core::database> _db;
};
}
ES文本消息管理
通过ES避免mysql查找效率慢的弊端,主要需要实现四种操作:创建索引、数据新增、数据检索、数据删除
需要创建索引的字段
- 聊天会话ID,实现通过聊天会话ID对消息进行过滤
- 消息内容,通过中文分词索引,关键字索引文本消息内容
具体实现
POST /message/_doc
{"settings": {"analysis": {"analyzer": {"ik": {"tokenizer": "ik_max_word"}}}},"mappings": {"dynamic": true,"properties": {"chat_session_id": {"type": "keyword","analyzer": "standard"},"message_id": {"type": "keyword","enabled": false},"user_id": {"type": "keyword","enabled": false},"create_time": {"type": "long","enabled": false},"content": {"type": "text","analyzer": "ik_max_word"}}}
}///查询全部信息/
GET /message/_doc/_search?pretty
{"query":{"match_all":{}}
}
插入数据测试
POST /message/_doc/_bulk
{"index":{"_id":"1"}}
{"chat_session_id":"会话 ID1","message_id":"消息ID1","content":"吃饭了吗?","user_id":"用户ID1","create_time":12345678910}
{"index":{"_id":"2"}}
{"chat_session_id":"会话 ID1","message_id":"消息ID2","content":"吃的盖浇饭。","user_id":"用户ID2","create_time":12345678910}
{"index":{"_id":"3"}}
{"chat_session_id":"会话 ID1","message_id":"消息ID3","content":"昨天吃饭了吗?","user_id":"用户ID2","create_time":12345678910}
{"index":{"_id":"4"}}
{"chat_session_id":"会话 ID2","message_id":"消息ID4","content":"昨天吃的盖浇饭。","user_id":"用户ID2","create_time":12345678910}
ES操作封装
// 消息索引管理类class ESMessage {public:using ptr = std::shared_ptr<ESMessage>;// 构造函数,接收ES客户端实例ESMessage(const std::shared_ptr<elasticlient::Client> &es_client) : _es_client(es_client) {}// 创建消息信息索引bool createIndex() {bool ret = ESIndex(_es_client, "message").append("user_id", "keyword", "standard", false).append("message_id", "keyword", "standard", false).append("create_time", "long", "standard", false).append("chat_session_id", "keyword", "standard", true).append("content").create();if (!ret) {LOG_INFO("消息信息索引创建失败!");return false;}LOG_INFO("消息信息索引创建成功!");return true;}// 向消息索引插入或更新消息数据bool appendData(const std::string &user_id,const std::string &message_id,const long create_time,const std::string &chat_session_id,const std::string &content) {bool ret = ESInsert(_es_client, "message").append("message_id", message_id).append("create_time", create_time).append("user_id", user_id).append("chat_session_id", chat_session_id).append("content", content).insert(message_id);if (!ret) {LOG_ERROR("消息数据插入/更新失败!");return false;}LOG_INFO("消息数据新增/更新成功!");return true;}// 删除消息数据bool remove(const std::string &mid) {bool ret = ESRemove(_es_client, "message").remove(mid);if (!ret) {LOG_ERROR("消息数据删除失败!");return false;}LOG_INFO("消息数据删除成功!");return true;}// 搜索消息std::vector<mag::Message> search(const std::string &key, const std::string &ssid) {std::vector<mag::Message> res;// 搜索消息内容及会话IDJson::Value json_user = ESSearch(_es_client, "message").append_must_term("chat_session_id.keyword", ssid).append_must_match("content", key).search();// 检查返回结果是否为数组类型if (!json_user.isArray()) {LOG_ERROR("消息搜索结果为空,或者结果不是数组类型");return res;}// 遍历每个搜索结果并创建Message对象int sz = json_user.size();LOG_DEBUG("检索结果条目数量:{}", sz);for (int i = 0; i < sz; i++) {mag::Message message;message.user_id(json_user[i]["_source"]["user_id"].asString());message.message_id(json_user[i]["_source"]["message_id"].asString());// 解析时间戳boost::posix_time::ptime ctime(boost::posix_time::from_time_t(json_user[i]["_source"]["create_time"].asInt64()));message.create_time(ctime);// 设置其他字段message.session_id(json_user[i]["_source"]["chat_session_id"].asString());message.content(json_user[i]["_source"]["content"].asString());res.push_back(message);}return res;}private:std::shared_ptr<elasticlient::Client> _es_client; // Elasticsearch 客户端};
消息存储子服务
核心组件分析
- RPC服务:客户端调用服务端方法
- ES:全文搜索和消息检索
- RabbitMQ:消息队列,用于异步处理和传递消息
- etcd:服务发现和注册
根据ID与时间范围获取历史消息
/*** @brief 获取历史消息,根据会话ID和时间范围。* * @param controller RPC控制器。* @param request 请求消息,包含会话ID、起始时间和结束时间。* @param response 响应消息,将填充历史消息列表。* @param done 回调函数,用于通知RPC调用完成。*/virtual void GetHistoryMsg(::google::protobuf::RpcController* controller,const ::mag::GetHistoryMsgReq* request,::mag::GetHistoryMsgRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done); // 确保在方法结束时调用 done->Run()// 定义错误响应的Lambda函数,统一处理错误auto err_response = [this, response](const std::string &rid, const std::string &errmsg) -> void {response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 1. 提取关键要素:请求ID,会话ID,起始时间,结束时间std::string rid = request->request_id();std::string chat_ssid = request->chat_session_id();boost::posix_time::ptime stime = boost::posix_time::from_time_t(request->start_time());boost::posix_time::ptime etime = boost::posix_time::from_time_t(request->over_time());// 2. 从MySQL数据库中查询消息auto msg_lists = _mysql_message->range(chat_ssid, stime, etime);if (msg_lists.empty()) {response->set_request_id(rid);response->set_success(true); // 查询成功但无结果return;}// 3. 统计所有文件类型消息的文件ID,并从文件子服务进行批量文件下载std::unordered_set<std::string> file_id_lists;for (const auto &msg : msg_lists) {if (msg.file_id().empty()) continue; // 仅处理有文件ID的消息LOG_DEBUG("需要下载的文件ID: {}", msg.file_id());file_id_lists.insert(msg.file_id());}std::unordered_map<std::string, std::string> file_data_lists;bool ret = _GetFile(rid, file_id_lists, file_data_lists);if (ret == false) {LOG_ERROR("{} 批量文件数据下载失败!", rid);return err_response(rid, "批量文件数据下载失败!");}// 4. 统计所有消息的发送者用户ID,从用户子服务进行批量用户信息获取std::unordered_set<std::string> user_id_lists; // 存储唯一的用户IDfor (const auto &msg : msg_lists) {user_id_lists.insert(msg.user_id());}std::unordered_map<std::string, UserInfo> user_lists;ret = _GetUser(rid, user_id_lists, user_lists);if (ret == false) {LOG_ERROR("{} 批量用户数据获取失败!", rid);return err_response(rid, "批量用户数据获取失败!");}// 5. 组织响应,填充消息列表response->set_request_id(rid);response->set_success(true);for (const auto &msg : msg_lists) {auto message_info = response->add_msg_list();message_info->set_message_id(msg.message_id());message_info->set_chat_session_id(msg.session_id());message_info->set_timestamp(boost::posix_time::to_time_t(msg.create_time()));message_info->mutable_sender()->CopyFrom(user_lists[msg.user_id()]);switch(msg.message_type()) {case MessageType::STRING:message_info->mutable_message()->set_message_type(MessageType::STRING);message_info->mutable_message()->mutable_string_message()->set_content(msg.content());break;case MessageType::IMAGE:message_info->mutable_message()->set_message_type(MessageType::IMAGE);message_info->mutable_message()->mutable_image_message()->set_file_id(msg.file_id());message_info->mutable_message()->mutable_image_message()->set_image_content(file_data_lists[msg.file_id()]);break;case MessageType::FILE:message_info->mutable_message()->set_message_type(MessageType::FILE);message_info->mutable_message()->mutable_file_message()->set_file_id(msg.file_id());message_info->mutable_message()->mutable_file_message()->set_file_size(msg.file_size());message_info->mutable_message()->mutable_file_message()->set_file_name(msg.file_name());message_info->mutable_message()->mutable_file_message()->set_file_contents(file_data_lists[msg.file_id()]);break;case MessageType::SPEECH:message_info->mutable_message()->set_message_type(MessageType::SPEECH);message_info->mutable_message()->mutable_speech_message()->set_file_id(msg.file_id());message_info->mutable_message()->mutable_speech_message()->set_file_contents(file_data_lists[msg.file_id()]);break;default:LOG_ERROR("消息类型错误!!");return;}}return;}
根据会话ID获取历史消息
/*** @brief 获取最近的消息,根据会话ID和消息数量。* * @param controller RPC控制器。* @param request 请求消息,包含会话ID和要获取的消息数量。* @param response 响应消息,将填充最近消息列表。* @param done 回调函数,用于通知RPC调用完成。*/virtual void GetRecentMsg(::google::protobuf::RpcController* controller,const ::mag::GetRecentMsgReq* request,::mag::GetRecentMsgRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done); // 确保在方法结束时调用 done->Run()// 定义错误响应的Lambda函数,统一处理错误auto err_response = [this, response](const std::string &rid, const std::string &errmsg) -> void {response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 1. 提取请求中的关键要素:请求ID,会话ID,要获取的消息数量std::string rid = request->request_id();std::string chat_ssid = request->chat_session_id();int msg_count = request->msg_count();// 2. 从MySQL数据库中获取最近的消息元信息auto msg_lists = _mysql_message->recent(chat_ssid, msg_count);if (msg_lists.empty()) {response->set_request_id(rid);response->set_success(true); // 查询成功但无结果return;}// 3. 统计所有消息中文件类型消息的文件ID列表,并从文件子服务下载文件内容std::unordered_set<std::string> file_id_lists;for (const auto &msg : msg_lists) {if (msg.file_id().empty()) continue; // 仅处理有文件ID的消息LOG_DEBUG("需要下载的文件ID: {}", msg.file_id());file_id_lists.insert(msg.file_id());}std::unordered_map<std::string, std::string> file_data_lists;bool ret = _GetFile(rid, file_id_lists, file_data_lists);if (ret == false) {LOG_ERROR("{} 批量文件数据下载失败!", rid);return err_response(rid, "批量文件数据下载失败!");}// 4. 统计所有消息的发送者用户ID,并从用户子服务获取用户信息std::unordered_set<std::string> user_id_lists; // 存储唯一的用户IDfor (const auto &msg : msg_lists) {user_id_lists.insert(msg.user_id());}std::unordered_map<std::string, UserInfo> user_lists;ret = _GetUser(rid, user_id_lists, user_lists);if (ret == false) {LOG_ERROR("{} 批量用户数据获取失败!", rid);return err_response(rid, "批量用户数据获取失败!");}// 5. 组织响应,填充消息列表response->set_request_id(rid);response->set_success(true);for (const auto &msg : msg_lists) {auto message_info = response->add_msg_list();message_info->set_message_id(msg.message_id());message_info->set_chat_session_id(msg.session_id());message_info->set_timestamp(boost::posix_time::to_time_t(msg.create_time()));message_info->mutable_sender()->CopyFrom(user_lists[msg.user_id()]);switch(msg.message_type()) {case MessageType::STRING:message_info->mutable_message()->set_message_type(MessageType::STRING);message_info->mutable_message()->mutable_string_message()->set_content(msg.content());break;case MessageType::IMAGE:message_info->mutable_message()->set_message_type(MessageType::IMAGE);message_info->mutable_message()->mutable_image_message()->set_file_id(msg.file_id());message_info->mutable_message()->mutable_image_message()->set_image_content(file_data_lists[msg.file_id()]);break;case MessageType::FILE:message_info->mutable_message()->set_message_type(MessageType::FILE);message_info->mutable_message()->mutable_file_message()->set_file_id(msg.file_id());message_info->mutable_message()->mutable_file_message()->set_file_size(msg.file_size());message_info->mutable_message()->mutable_file_message()->set_file_name(msg.file_name());message_info->mutable_message()->mutable_file_message()->set_file_contents(file_data_lists[msg.file_id()]);break;case MessageType::SPEECH:message_info->mutable_message()->set_message_type(MessageType::SPEECH);message_info->mutable_message()->mutable_speech_message()->set_file_id(msg.file_id());message_info->mutable_message()->mutable_speech_message()->set_file_contents(file_data_lists[msg.file_id()]);break;default:LOG_ERROR("消息类型错误!!");return;}}return;}
根据关键字和会话ID搜索消息
/*** @brief 基于关键字和会话ID搜索文本消息。* * @param controller RPC控制器。* @param request 请求消息,包含会话ID和搜索关键字。* @param response 响应消息,将填充匹配的消息列表。* @param done 回调函数,用于通知RPC调用完成。*/virtual void MsgSearch(::google::protobuf::RpcController* controller,const ::mag::MsgSearchReq* request,::mag::MsgSearchRsp* response,::google::protobuf::Closure* done) {brpc::ClosureGuard rpc_guard(done); // 确保在方法结束时调用 done->Run()// 定义错误响应的Lambda函数,统一处理错误auto err_response = [this, response](const std::string &rid, const std::string &errmsg) -> void {response->set_request_id(rid);response->set_success(false);response->set_errmsg(errmsg);return;};// 1. 从请求中提取关键要素:请求ID,会话ID,关键字std::string rid = request->request_id();std::string chat_ssid = request->chat_session_id();std::string skey = request->search_key();// 2. 从ES搜索引擎中进行关键字消息搜索,得到消息列表auto msg_lists = _es_message->search(skey, chat_ssid);if (msg_lists.empty()) {response->set_request_id(rid);response->set_success(true); // 搜索成功但无结果return;}// 3. 统计所有消息的发送者用户ID,并从用户子服务获取用户信息std::unordered_set<std::string> user_id_lists; // 存储唯一的用户IDfor (const auto &msg : msg_lists) {user_id_lists.insert(msg.user_id());}std::unordered_map<std::string, UserInfo> user_lists;bool ret = _GetUser(rid, user_id_lists, user_lists);if (ret == false) {LOG_ERROR("{} 批量用户数据获取失败!", rid);return err_response(rid, "批量用户数据获取失败!");}// 4. 组织响应,填充消息列表response->set_request_id(rid);response->set_success(true);for (const auto &msg : msg_lists) {auto message_info = response->add_msg_list();message_info->set_message_id(msg.message_id());message_info->set_chat_session_id(msg.session_id());message_info->set_timestamp(boost::posix_time::to_time_t(msg.create_time()));message_info->mutable_sender()->CopyFrom(user_lists[msg.user_id()]);message_info->mutable_message()->set_message_type(MessageType::STRING);message_info->mutable_message()->mutable_string_message()->set_content(msg.content());}return;}
处理消息队列的新消息
/*** @brief 处理从消息队列接收到的新消息,进行存储和相关操作。* * @param body 消息体,序列化后的数据。* @param sz 消息体大小。*/void onMessage(const char *body, size_t sz) {LOG_DEBUG("收到新消息,进行存储处理!");// 1. 取出序列化的消息内容,进行反序列化mag::MessageInfo message;bool ret = message.ParseFromArray(body, sz);if (ret == false) {LOG_ERROR("对消费到的消息进行反序列化失败!");return;}// 2. 根据不同的消息类型进行不同的处理std::string file_id, file_name, content;int64_t file_size;switch(message.message().message_type()) {// 1. 如果是一个文本类型消息,取元信息存储到ES中case MessageType::STRING:content = message.message().string_message().content();ret = _es_message->appendData(message.sender().user_id(),message.message_id(),message.timestamp(),message.chat_session_id(),content);if (ret == false) {LOG_ERROR("文本消息向存储引擎进行存储失败!");return;}break;// 2. 如果是一个图片/语音/文件消息,则取出数据存储到文件子服务中,并获取文件IDcase MessageType::IMAGE:{const auto &msg = message.message().image_message();ret = _PutFile("", msg.image_content(), msg.image_content().size(), file_id);if (ret == false) {LOG_ERROR("上传图片到文件子服务失败!");return;}}break;case MessageType::FILE:{const auto &msg = message.message().file_message();file_name = msg.file_name();file_size = msg.file_size();ret = _PutFile(file_name, msg.file_contents(), file_size, file_id);if (ret == false) {LOG_ERROR("上传文件到文件子服务失败!");return;}}break;case MessageType::SPEECH:{const auto &msg = message.message().speech_message();ret = _PutFile("", msg.file_contents(), msg.file_contents().size(), file_id);if (ret == false) {LOG_ERROR("上传语音到文件子服务失败!");return;}}break;default:LOG_ERROR("消息类型错误!");return;}// 3. 提取消息的元信息,存储到MySQL数据库中mag::Message msg(message.message_id(), message.chat_session_id(),message.sender().user_id(),message.message().message_type(),boost::posix_time::from_time_t(message.timestamp()));msg.content(content);msg.file_id(file_id);msg.file_name(file_name);msg.file_size(file_size);ret = _mysql_message->insert(msg);if (ret == false) {LOG_ERROR("向数据库插入新消息失败!");return;}}
子服务封装
int main(int argc, char *argv[])
{// 解析命令行标志google::ParseCommandLineFlags(&argc, &argv, true);// 初始化日志系统init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);// 开始初始化服务器LOG_INFO("服务器初始化开始...");mag::MessageServerBuilder msb;// 配置消息队列对象LOG_INFO("初始化消息队列连接,用户:{}", FLAGS_mq_user);msb.make_mq_object(FLAGS_mq_user, FLAGS_mq_pswd, FLAGS_mq_host,FLAGS_mq_msg_exchange, FLAGS_mq_msg_queue, FLAGS_mq_msg_binding_key);// 配置 Elasticsearch 对象LOG_INFO("初始化 Elasticsearch,主机:{}", FLAGS_es_host);msb.make_es_object({FLAGS_es_host});// 配置 MySQL 对象LOG_INFO("初始化 MySQL 连接,用户:{}", FLAGS_mysql_user);msb.make_mysql_object(FLAGS_mysql_user, FLAGS_mysql_pswd, FLAGS_mysql_host, FLAGS_mysql_db, FLAGS_mysql_cset, FLAGS_mysql_port, FLAGS_mysql_pool_count);// 配置服务发现对象LOG_INFO("初始化服务发现,注册中心主机:{}", FLAGS_registry_host);msb.make_discovery_object(FLAGS_registry_host, FLAGS_base_service, FLAGS_file_service, FLAGS_user_service);// 配置 RPC 服务器LOG_INFO("初始化 RPC 服务器,监听端口:{}", FLAGS_listen_port);msb.make_rpc_server(FLAGS_listen_port, FLAGS_rpc_timeout, FLAGS_rpc_threads);// 配置服务注册对象LOG_INFO("注册服务到注册中心,注册中心主机:{},服务名称:{}", FLAGS_registry_host, FLAGS_base_service + FLAGS_instance_name);msb.make_registry_object(FLAGS_registry_host, FLAGS_base_service + FLAGS_instance_name, FLAGS_access_host);// 构建并启动服务器auto server = msb.build();LOG_INFO("服务器构建完成,正在启动...");server->start();LOG_INFO("服务器启动成功,开始提供服务。");return 0;
}