假设我们忽略构造、赋值、互换、拷贝,那么只剩下3组操作。
1:empty、size,用于查询队列的整体状态
2:front、back,用于查询队列中的元素
3:push、pop,用于修改队列
一些细节:
- 由于接口存在固有的条件竞争,所以需要把front和pop合并成一个函数,这个和栈容器的top、pop合并非常类似。
- 当队列用于线程间传递数据的时候,负责接收的线程通常需要等待线程压入。所以需要提供两个pop变体:
- try_pop:试图弹出队首元素,若失败直接返回
- wait_and_pop:试图弹出队首元素,若失败,等待到有数据可以读
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>#include <memory>
template<typename T>class threadsafe_queue {
private:mutable std::mutex mtx; // 互斥必须用mutable修饰(针对const对象,准许其数据成员发生变动)std::queue<T> data_queue;std::condition_variable data_cond;
public:threadsafe_queue() {}threadsafe_queue(threadsafe_queue const& other) {std::lock_guard<std::mutex> lk(other.mtx);data_queue = other.data_queue;}threadsafe_queue(threadsafe_queue&& other) {std::lock_guard<std::mutex> lk(other.mtx);data_queue = std::move(other.data_queue);}threadsafe_queue& operator=(threadsafe_queue const& other) {std::lock_guard<std::mutex> lk(mtx);data_queue = other.data_queue;return *this;}threadsafe_queue& operator=(threadsafe_queue&& other) {std::lock_guard<std::mutex> lk(mtx);data_queue = std::move(other.data_queue);return *this;}void push(T new_value) {std::lock_guard<std::mutex> lk(mtx);data_queue.push(new_value);data_cond.notify_one();}void wait_and_pop(T& value) {std::unique_lock<std::mutex> lk(mtx);data_cond.wait(lk,[this]{return !data_queue.empty();});value = data_queue.front();data_queue.pop();}std::shared_ptr<T> wait_and_pop() {std::unique_lock<std::mutex> lk(mtx);data_cond.wait(lk, [this] {return !data_queue.empty();});std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));data_queue.pop();return res;}bool try_pop(T& value) {std::lock_guard<std::mutex> lk(mtx);if (data_queue.empty()) {return false;}value = data_queue.front();data_queue.pop();return true;}std::shared_ptr<T> try_pop() {std::lock_guard<std::mutex> lk(mtx);if (data_queue.empty()) {return std::shared_ptr<T>();}std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));data_queue.pop();return res;}bool empty() const {std::lock_guard<std::mutex> lk(mtx);return data_queue.empty();}size_t size() const {std::lock_guard<std::mutex> lk(mtx);return data_queue.size();}
};void producer(threadsafe_queue<int>& q) {for (int i = 0; i < 10; ++i) {q.push(i);std::this_thread::sleep_for(std::chrono::milliseconds(100));}
}void consumer(threadsafe_queue<int>& q) {int value;for (int i = 0; i < 10; ++i) {q.wait_and_pop(value);std::cout << "Consumer got value: " << value << std::endl;}
}int main() {threadsafe_queue<int> q;std::thread t1(producer, std::ref(q));std::thread t2(consumer, std::ref(q));t1.join();t2.join();return 0;
}
在这个示例代码中,我们定义了一个生产者函数 producer
和一个消费者函数 consumer
,它们分别向 threadsafe_queue
中推入数据和从中取出数据。我们创建了一个 threadsafe_queue
对象 q
,并在两个线程中分别调用 producer
和 consumer
函数。在 producer
函数中,我们向 q
中推入了 10 个整数,每个整数之间间隔 100 毫秒。在 consumer
函数中,我们从 q
中取出了 10 个整数,并输出了每个整数的值。
需要注意的是,threadsafe_queue
是一个线程安全的队列,它使用了互斥量和条件变量来保证线程安全。在使用 threadsafe_queue
时,需要注意避免死锁和竞争条件的问题。同时,为了避免拷贝和移动对象时出现问题,我们增加了移动构造、析构函数、拷贝赋值运算符、移动赋值运算符。