概述
实现逻辑
服务器搭建流程分析
- 基于gflags模块进行参数解析
- 初始化日志模块
- 搭建RPC服务器,实现各个接口功能
- 向服务中心进行服务注册
具体实现
Protobuf接口
message SpeechRecognitionReq {string request_id = 1; //请求IDbytes speech_content = 2; //语音数据optional string user_id = 3; //用户IDoptional string session_id = 4; //登录会话ID -- 网关进行身份鉴权
}message SpeechRecognitionRsp {string request_id = 1; //请求IDbool success = 2; //请求处理结果标志optional string errmsg = 3; //失败原因optional string recognition_result = 4; //识别后的文字数据
}//语音识别Rpc服务及接口的定义,传入语音识别的请求,然后返回语音识别的响应
service SpeechService {rpc SpeechRecognition(SpeechRecognitionReq) returns (SpeechRecognitionRsp);
}
speech_server.hpp
服务端的头文件中主要构建了三个类
- SpeechServiceImpl 主要用于实际的语音识别逻辑,具体是通过调用ASRClient来进行语音识别
- SpeechServer 主要启动和管理RPC服务,提供start方法让服务进入运行状态
- SpeechServerBuilder 通过构建者,逐步配置语言识别服务所需要的对象,其中包括客户端、服务注册和RPC服务器
SpeechServiceImpl
核心就是SpeechRecognition方法
- 取出请求中的语音数据
- 调用ASRClient来进行语音识别
- 识别成功就返回结果,识别失败就将错误信息写入响应
class SpeechServiceImpl : public mag::SpeechService {public:// 构造函数,传入 ASRClient 对象,初始化语音识别客户端SpeechServiceImpl(const ASRClient::ptr &asr_client): _asr_client(asr_client) {}// 析构函数,释放资源~SpeechServiceImpl() {}// SpeechRecognition 方法是实际的服务实现,处理语音识别请求// controller:用于控制 RPC 调用的执行// request:客户端请求的数据,包括语音内容// response:服务端返回的响应数据// done:调用完成后的回调,用于通知客户端void SpeechRecognition(google::protobuf::RpcController* controller,const ::mag::SpeechRecognitionReq* request,::mag::SpeechRecognitionRsp* response,::google::protobuf::Closure* done) {LOG_DEBUG("收到语音转文字请求!");// 使用 brpc::ClosureGuard 确保 RPC 调用完成后会自动执行 done 回调brpc::ClosureGuard rpc_guard(done);// 1. 提取请求中的语音内容// 2. 调用语音识别客户端进行语音识别,获取识别结果std::string err;std::string res = _asr_client->recognize(request->speech_content(), err);// 3. 如果识别失败,设置响应失败信息if (res.empty()) {LOG_ERROR("{} 语音识别失败!", request->request_id());response->set_request_id(request->request_id());response->set_success(false);response->set_errmsg("语音识别失败:" + err);return;}// 4. 如果识别成功,设置响应内容response->set_request_id(request->request_id());response->set_success(true);response->set_recognition_result(res);}private:ASRClient::ptr _asr_client; // 语音识别客户端};
SpeechServer
主要就是提供接口来启动服务器
class SpeechServer {public:using ptr = std::shared_ptr<SpeechServer>;// 构造函数,初始化语音识别客户端、服务注册客户端和 RPC 服务器SpeechServer(const ASRClient::ptr asr_client,const Registry::ptr ®_client,const std::shared_ptr<brpc::Server> &server): _asr_client(asr_client), _reg_client(reg_client), _rpc_server(server) {}// 析构函数,释放资源~SpeechServer() {}// 启动 RPC 服务器,进入事件循环,直到请求停止void start() {_rpc_server->RunUntilAskedToQuit();}private:ASRClient::ptr _asr_client; // 语音识别客户端Registry::ptr _reg_client; // 服务注册客户端std::shared_ptr<brpc::Server> _rpc_server; // RPC 服务器};
SpeechServerBuilder
通过brpc框架,构建一个RPC的语音识别服务器,该处使用了构建者模式从而实现灵活构建
class SpeechServerBuilder {public:// 构造语音识别客户端对象// app_id、api_key、secret_key:用于初始化语音识别客户端的参数void make_asr_object(const std::string &app_id,const std::string &api_key,const std::string &secret_key) {_asr_client = std::make_shared<ASRClient>(app_id, api_key, secret_key);}// 构造服务注册客户端对象// reg_host:注册中心的地址// service_name:服务名称// access_host:服务访问地址void make_reg_object(const std::string ®_host,const std::string &service_name,const std::string &access_host) {_reg_client = std::make_shared<Registry>(reg_host);_reg_client->registry(service_name, access_host);}// 构造 RPC 服务器对象并启动服务器// port:服务端口,timeout:超时时间,num_threads:线程数void make_rpc_server(uint16_t port, int32_t timeout, uint8_t num_threads) {// 确保语音识别模块已经初始化if (!_asr_client) {LOG_ERROR("还未初始化语音识别模块!");abort();}// 创建 brpc::Server 对象_rpc_server = std::make_shared<brpc::Server>();// 创建 SpeechServiceImpl 对象,并将其添加到 RPC 服务器// SpeechServiceImpl 类实现了语音识别服务的具体业务逻辑SpeechServiceImpl *speech_service = new SpeechServiceImpl(_asr_client);int ret = _rpc_server->AddService(speech_service,brpc::ServiceOwnership::SERVER_OWNS_SERVICE);if (ret == -1) {LOG_ERROR("添加Rpc服务失败!");abort();}// 配置 RPC 服务器的选项brpc::ServerOptions options;options.idle_timeout_sec = timeout; // 设置空闲超时时间options.num_threads = num_threads; // 设置工作线程数// 启动服务器,监听指定端口ret = _rpc_server->Start(port, &options);if (ret == -1) {LOG_ERROR("服务启动失败!");abort();}}// 构建并返回 SpeechServer 对象// 返回一个包含语音识别客户端、服务注册客户端和 RPC 服务器的完整服务对象SpeechServer::ptr build() {if (!_asr_client) {LOG_ERROR("还未初始化语音识别模块!");abort();}if (!_reg_client) {LOG_ERROR("还未初始化服务注册模块!");abort();}if (!_rpc_server) {LOG_ERROR("还未初始化RPC服务器模块!");abort();}// 返回构建好的 SpeechServer 对象SpeechServer::ptr server = std::make_shared<SpeechServer>(_asr_client, _reg_client, _rpc_server);return server;}private:ASRClient::ptr _asr_client; // 语音识别客户端Registry::ptr _reg_client; // 服务注册客户端std::shared_ptr<brpc::Server> _rpc_server; // RPC 服务器};
speech_server.cc
int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);mag::SpeechServerBuilder ssb;ssb.make_asr_object(FLAGS_app_id, FLAGS_api_key, FLAGS_secret_key);ssb.make_rpc_server(FLAGS_listen_port, FLAGS_rpc_timeout, FLAGS_rpc_threads);ssb.make_reg_object(FLAGS_registry_host, FLAGS_base_service + FLAGS_instance_name, FLAGS_access_host);auto server = ssb.build();server->start();return 0;
}
test.cc
- 通过etcd服务发现机制找到语音识别服务的RPC通信信道
- 读取语音文件的内容
- 构造语音识别请求并通过RPC调用发送给语音服务
- 接收并处理语音识别结果,输出识别成功或者失败的消息
int main(int argc, char *argv[])
{// 解析命令行参数google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);// 1. 构造 Rpc 信道管理对象,用于选择合适的 RPC 信道auto sm = std::make_shared<mag::ServiceManager>();// 注册服务发现时回调函数sm->declared(FLAGS_speech_service);auto put_cb = std::bind(&mag::ServiceManager::onServiceOnline, sm.get(), std::placeholders::_1, std::placeholders::_2);auto del_cb = std::bind(&mag::ServiceManager::onServiceOffline, sm.get(), std::placeholders::_1, std::placeholders::_2);// 2. 构造服务发现对象,连接到 Etcd 服务注册中心mag::Discovery::ptr dclient = std::make_shared<mag::Discovery>(FLAGS_etcd_host, FLAGS_base_service, put_cb, del_cb);// 3. 通过 Rpc 信道管理对象,获取提供语音识别服务的信道auto channel = sm->choose(FLAGS_speech_service);if (!channel) {// 如果没有获取到可用的信道,等待 1 秒后退出std::this_thread::sleep_for(std::chrono::seconds(1));return -1;}// 4. 读取语音文件数据std::string file_content;aip::get_file_content("16k.pcm", &file_content); // 从文件中读取 PCM 格式的语音数据std::cout << "语音文件大小: " << file_content.size() << " 字节" << std::endl;// 5. 构造并发起语音识别的 RPC 调用mag::SpeechService_Stub stub(channel.get()); // 创建服务存根对象mag::SpeechRecognitionReq req; // 创建语音识别请求对象req.set_speech_content(file_content); // 设置请求中的语音数据req.set_request_id("111111"); // 设置请求 IDbrpc::Controller *cntl = new brpc::Controller(); // 创建 RPC 控制器,用于管理请求的生命周期mag::SpeechRecognitionRsp *rsp = new mag::SpeechRecognitionRsp(); // 创建响应对象,用于接收服务端返回的数据// 发起 RPC 调用,调用语音识别服务stub.SpeechRecognition(cntl, &req, rsp, nullptr);// 检查 RPC 调用是否失败if (cntl->Failed()) {std::cout << "Rpc 调用失败:" << cntl->ErrorText() << std::endl;delete cntl;delete rsp;std::this_thread::sleep_for(std::chrono::seconds(1)); // 等待一秒后退出return -1;}// 检查语音识别的结果是否成功if (!rsp->success()) {std::cout << "语音识别失败: " << rsp->errmsg() << std::endl;delete cntl;delete rsp;return -1;}// 输出识别结果std::cout << "收到响应, 请求 ID: " << rsp->request_id() << std::endl;std::cout << "语音识别结果: " << rsp->recognition_result() << std::endl;// 清理资源delete cntl;delete rsp;return 0;
}
CMake
# 1. 添加cmake版本说明
cmake_minimum_required(VERSION 3.1.3)# 2. 声明工程名称
project(speech_server)set(target "speech_server")
set(test_client "speech_client")# 3. 检测并生成ODB框架代码
set(proto_path ${CMAKE_CURRENT_SOURCE_DIR}/../proto)
set(proto_files speech.proto)set(proto_srcs "")
set(proto_cxx "")
set(proto_srcs "")
foreach(proto_file ${proto_files})string(REPLACE ".proto" ".pb.cc" proto_cc ${proto_file})string(REPLACE ".proto" ".pb.h" proto_hh ${proto_file})# 如果没有生成,则预定义生成指令 -- 用于在构建项目之间先生成框架代码if (NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})add_custom_command(PRE_BUILDCOMMAND protocARGS --cpp_out=${CMAKE_CURRENT_BINARY_DIR} -I${proto_path} --experimental_allow_proto3_optional ${proto_path}/${proto_file}DEPENDS ${proto_path}/${proto_file}OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc}COMMENT "生成Protobuf框架代码文件:" ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})endif()list(APPEND proto_srcs ${CMAKE_CURRENT_BINARY_DIR}/${proto_cc})
endforeach()# 4. 获取源码目录下的所有源码文件
set(src_files "")
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/source src_files)# 5. 声明目标及依赖
add_executable(${target} ${src_files} ${proto_srcs})set(test_files "")
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR}/test test_files)
add_executable(${test_client} ${test_files} ${proto_srcs})
target_link_libraries(${test_client} -lgflags -lspdlog -lfmt -lbrpc -lssl -lcrypto -lprotobuf -lleveldb -letcd-cpp-api -lcpprest -lcurl /usr/lib/x86_64-linux-gnu/libjsoncpp.so.19)# 7. 设置需要连接的库
find_package(gflags REQUIRED) # 确保 gflags 被正确找到
target_link_libraries(${target} gflags spdlog fmt brpc ssl crypto protobuf leveldb etcd-cpp-api cpprest -lcurl /usr/lib/x86_64-linux-gnu/libjsoncpp.so.19)# 6. 设置头文件默认搜索路径
include_directories(${CMAKE_CURRENT_BINARY_DIR})
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../common)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../third/include)# 8. 设置安装路径
INSTALL(TARGETS ${target} ${test_client} RUNTIME DESTINATION bin)
测试
问题总结
aip-cpp-sdk 问题
经排查,该库函数中std::transform使用不正确导致的
因为toupper和tolower是C库中的函数,接收的是一个字符,而不是一个字符串,所以最终导致类型推导失败
inline std::string to_upper(std::string src){//11-30修改std::transform(src.begin(), src.end(), src.begin(), [](unsigned char c) { return std::toupper(c); });// std::transform(src.begin(), src.end(), src.begin(), toupper);return src;}inline std::string to_lower(std::string src){//11-30修改// std::transform(src.begin(), src.end(), src.begin(), tolower);std::transform(src.begin(), src.end(), src.begin(), [](unsigned char c) { return std::tolower(c); });return src;}
客户端调用RPC服务失败
root@hcss-ecs-b4a9:/home/chatServer/chatServer/src/server/speech/build# ./speech_client
[default-logger][21:38:01][414488][debug ][/home/chatServer/chatServer/src/server/speech/../common/channel.hpp:118] /service-127.0.0.1:9090 服务上线了,但是当前并不关心!
[default-logger][21:38:01][414488][debug ][/home/chatServer/chatServer/src/server/speech/../common/channel.hpp:139] /service/speech_service-127.0.0.1:10001 服务上线新节点,进行添加管理!
0
Rpc调用失败:[E1014]Got EOF of Socket{id=0fd=10 addr=127.0.0.1:10001:39996} (0x0x55753db88940) [R1][E112]Not connected to 127.0.0.1:10001 yet, server_id=0 [R2][E112]Not connected to 127.0.0.1:10001 yet, server_id=0 [R3][E112]Not connected to 127.0.0.1:10001 yet, server_id=0
I1130 21:38:01.654227 414538 4294969856 /home/chatServer/chatServer/Test/brpc/brpc/src/brpc/socket.cpp:2570] Checking Socket{id=0 addr=127.0.0.1:10001} (0x55753db88940)
[warn] watcher does't exit normally
生命周期问题,通过智能指针或者堆上建立即可,后期统一解决