需求:图像识别中,注册的样本多了会影响计算速度,成为性能瓶颈,其中一个优化方法就是使用多线程。例如,注册了了3000个特征,每个特征4096个float。可以把3000个特征比对放到4个线程中进行计算,然后再把结果进行合并。实现思路:
1. 根据系统性能和需求配置线程池的大小,创建线程池,将比较任务平均分配到各个线程
2. 工作线程启动后在一个condition_variable上wait,注意:锁的范围不能太大了,否则多个线程会变成串行
3. 调用者调用识别接口,接口更新目标特征,通知各个工作线程,在另外一个condition_variable上wait,并且满足完成计数器的值等于线程数
4. 工作线程完成后将计数器加一,并且通知调用线程
5. 调用线程收集到所有线程的结果后再对结果进行合并返回
后续:
1. 代码进行优化,更优雅的实现
测试结果:
线程数 | 时间 |
1 | 71362ms |
2 | 36292ms |
4 | 19420ms |
8 | 18465ms |
16 | 18433ms |
32 | 18842ms |
64 | 19324ms |
128 | 19388ms |
256 | 21853ms |
512 | 26150ms |
1024 | 35593ms |
代码如下:
#include <iostream>
#include <string>
#include <cstring>
#include <mutex>
#include <unordered_map>
#include <list>
#include <utility>
#include <algorithm>
#include <string>
#include <vector>
#include <thread>
#include <chrono>
using namespace std;
using namespace chrono;
double get_mold(const vector<double> &vec)
{
int n = vec.size();
double sum = 0.0;
for(int i = 0; i < n; ++i)
{
sum += vec[i] * vec[i];
}
return sqrt(sum);
}
double cosine_distance(const vector<double> &base, const vector<double> &target)
{
int n = base.size();
double tmp = 0.0;
for(int i = 0; i < n; ++i)
{
tmp += base[i] * target[i];
}
double simility = tmp / (get_mold(base) * get_mold(target));
return simility;
}
class Recognizer
{
public:
Recognizer(int num_threads) :
num_threads_(num_threads),
is_run_calculate_thread_(true),
is_doing_recognize(false),
result_count(0)
{
recognize_result = std::vector<RecognizeResult>(num_threads);
this->load_feature();
this->init_threads();
}
~Recognizer()
{
is_run_calculate_thread_ = false;
cv_.notify_all();
for(std::thread &th : threads_)
th.join();
}
int do_recognize(const vector<double> &feature);
private:
class CigaretteItem
{
public:
int cigarette_id_;
std::string cigarette_name_;
std::vector<double> feature_;
CigaretteItem(int cigarette_id, std::string cigarette_name, const std::vector<double> &cigarette_feature)
{
cigarette_id_ = cigarette_id;
cigarette_name_ = cigarette_name;
feature_ = std::vector<double>(cigarette_feature.size());
for(int i = 0; i < cigarette_feature.size(); i++)
{
feature_[i] = cigarette_feature[i];
}
}
};
class RecognizeResult
{
public:
int cigarette_id_;
std::string cigarette_name_;
double score_;
};
private:
int num_threads_;
bool is_run_calculate_thread_;
bool is_doing_recognize;
std::vector<CigaretteItem> ciagarette_list_;
std::vector<double> target_feature_;
std::mutex cv_mtx_;
std::condition_variable cv_;
std::vector<RecognizeResult> recognize_result;
std::vector<std::thread> threads_;
int result_count;
std::mutex result_count_mtx_;
std::mutex result_cv_mtx_;
std::condition_variable result_cv_;
private:
Recognizer(const Recognizer&) = delete;
Recognizer& operator=(const Recognizer&) = delete;
void load_feature();
void init_threads();
void calculate_most_similarity(const int thread_id, const int start_index, const int end_index);
};
void Recognizer::load_feature()
{
for(int i = 0; i < 3000; i++)
{
vector<double> fea = vector<double>(4096);
for(int i = 0; i < 4096; ++i)
fea[i] = (double)(rand() % 998 + 1) / 1000.00;
ciagarette_list_.emplace_back(i+1, "cigarette", fea);
}
}
void Recognizer::init_threads()
{
for(int i = 0; i < num_threads_; i++)
{
int step = this->ciagarette_list_.size() / this->num_threads_;
int start_index = i * step;
int end_index = (i+1) * step;
if(i == num_threads_ - 1){
end_index = ciagarette_list_.size();
}
std::cout << "thread" << i << " starts at " << start_index << "; ends at " << end_index << std::endl;
threads_.emplace_back(&Recognizer::calculate_most_similarity, this, i, start_index, end_index);
}
}
void Recognizer::calculate_most_similarity(const int thread_id, const int start_index, const int end_index)
{
while(is_run_calculate_thread_)
{
{
std::unique_lock<std::mutex> lock(cv_mtx_);
cv_.wait(lock);
}
//cout << "thread" << thread_id << " is running" << endl;
double max_score = -1.00;
int max_score_index = -1;
for(int i = start_index; i < end_index; ++i){
double score = cosine_distance(ciagarette_list_[i].feature_, target_feature_);
if(score > max_score)
{
max_score = score;
max_score_index = i;
}
}
recognize_result[thread_id].cigarette_id_ = ciagarette_list_[max_score_index].cigarette_id_;
recognize_result[thread_id].cigarette_name_ = ciagarette_list_[max_score_index].cigarette_name_;
recognize_result[thread_id].score_ = max_score;
{
std::unique_lock<std::mutex> lock(result_count_mtx_);
result_count += 1;
}
result_cv_.notify_one();
//std::cout << "thread" << thread_id << " finish one task" << endl;
}
//std::cout << "thread" << thread_id << " finished." << std::endl;
}
int Recognizer::do_recognize(const vector<double> &feature)
{
if(is_doing_recognize)
return -1;
is_doing_recognize = true;
this->target_feature_ = feature;
//cout << "cv_.notify_all()" << endl;
cv_.notify_all();
std::unique_lock<std::mutex> lock(result_cv_mtx_);
result_cv_.wait(lock, [this](){return this->num_threads_ == this->result_count;});
//std::cout << "all threads finish computing similarity" << endl;
int max_score_cigarette_id = -1;
std::string max_score_cigarette_name = "";
double max_score = -1.0;
for(int i = 0; i < num_threads_; ++i)
{
if(recognize_result[i].score_ > max_score)
{
max_score_cigarette_id = recognize_result[i].cigarette_id_;
max_score_cigarette_name = recognize_result[i].cigarette_name_;
max_score = recognize_result[i].score_;
}
}
//cout << "cigarette_id=" << max_score_cigarette_id << ", cigarette_name=" << max_score_cigarette_name << ", score=" << max_score << endl;
this->result_count = 0;
is_doing_recognize = false;
return 0;
}
int main(void)
{
Recognizer recognizer{1024};
std::this_thread::sleep_for(std::chrono::seconds(1));
const int loops = 400;
auto start_time = system_clock::now();
for(int i = 0; i < loops; i++)
{
//cout << endl;
std::vector<double> target_feature = std::vector<double>(4096);
for(int i = 0; i < 4096; ++i)
{
//target_feature[i] = (double)(rand() % 998 + 1) / 1000.000;
target_feature[i] = (double)(i % 1000 + 1) / 1000.00;
}
recognizer.do_recognize(target_feature);
//if((i+1) % 20 == 0)
// cout << "i=" << i << endl;
}
auto end_time = system_clock::now();
auto duration = duration_cast<milliseconds>(end_time - start_time);
cout << "eplased_time:" << duration.count() << "ms" << endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
return 0;
}