文章目录
- 什么是无锁算法
- 什么是原子变量
- 什么是CAS操作
- Compare-And-Swap Weak在哪些情况下会失败
- 举例说明无锁结构
- 无锁结构的问题
什么是无锁算法
无锁算法(Lock-Free Algorithm)是一种并发编程技术,旨在实现多线程环境下的高效数据共享,而无需使用传统的锁机制(如互斥锁)。
关键特性或优点:
- 无阻塞:至少有一个线程能在有限步骤内完成操作,确保系统整体不会因锁争用而停滞。
- 无死锁:由于不使用锁,避免了死锁问题。
- 高并发性:适用于多核处理器,多个线程可同时操作共享数据,减少等待时间。
在C++ 可以使用原子变量来实现无锁算法与结构
什么是原子变量
C++的原子变量(Atomic Variable)是通过std::atomic模板类提供的,用于在多线程环境中安全地操作共享数据
模板原形
// Defined in header <atomic>
template< class T >
struct atomic; // (1) (since C++11)
template< class U >
struct atomic<U*>; //(2) (since C++11)
// Defined in header <memory>
template< class U >
struct atomic<std::shared_ptr<U>>; //(3) (since C++20)
template< class U >
struct atomic<std::weak_ptr<U>>; // (4) (since C++20)
Defined in header <stdatomic.h>
#define _Atomic(T) /* see below */ // (5) (since C++23)
具体参考cppreference atomic
下面是一个使用atomic_flag 实现自旋锁的例子
class SpinLock {std::atomic_flag flag(ATOMIC_FLAG_INIT);
public:void lock(){while(flag.test_and_set(std::memory_and_acquire)){}}void unlock(){flag.clear(std::memory_and_release);}};
其中std::memory_and_release
与 std::memory_and_acquire
的含义需自行查找内存模型与顺序
使用atomic 操作就是没有显示的使用锁操作,但atomic内部可能是使用锁机制来实现的也可能是使用cpu的CAS操作来实现的
通过atomic::is_lock_free
可以查看atomic内部是不是无锁机制实现的
#include <atomic>
#include <iostream>
#include <utility>struct A { int a[100]; };
struct B { int x, y; };
struct C { int x, y, z; };int main()
{std::cout << std::boolalpha<< "std::atomic_char is lock free? "<< std::atomic_char{}.is_lock_free() << '\n'<< "std::atomic_uintmax_t is lock free? "<< std::atomic_uintmax_t{}.is_lock_free() << '\n'<< "std::atomic<A> is lock free? "<< std::atomic<A>{}.is_lock_free() << '\n'<< "std::atomic<B> is lock free? "<< std::atomic<B>{}.is_lock_free() << '\n'<< "std::atomic<C> is lock free? "<< std::atomic<C>{}.is_lock_free() << '\n';
}
是与类型的长度有关的
什么是CAS操作
CAS(Compare-And-Swap,比较并交换)是一种原子操作,用于在多线程环境中实现无锁同步。CAS操作会检查某个内存位置的值是否与预期值匹配,如果匹配,则将其更新为新值;否则,不进行任何操作。无论是否更新,CAS操作都会返回内存位置的原始值。
CAS 操作包含三个操作数:
- 内存地址:需要修改的共享变量的地址。
- 期望值:当前线程认为该变量应该具有的值。
- 新值:如果变量的当前值等于期望值,则将其更新为新值。
CAS 的工作流程:
- 读取内存地址中的当前值。
- 比较当前值与期望值:
- 如果相等,则将新值写入内存地址。
- 如果不相等,则不做任何操作。
- 返回操作是否成功(通常返回当前值或布尔值)。
CAS 的硬件支持:
- x86:CMPXCHG 指令(单字 CAS),CMPXCHG16B 指令(双字 CAS)。
- ARM:LDREX 和 STREX 指令(加载-存储独占指令)。
- 其他架构:大多数现代 CPU 都提供了类似的原子指令。
以下是一个使用 CMPXCHG 实现原子操作的 C++ 示例:
#include <iostream>
#include <atomic>// 使用 CMPXCHG 实现原子 CAS 操作
bool atomic_compare_exchange(int* dest, int expected, int new_value) {int result;__asm__ volatile ("lock cmpxchgl %3, %1;" // lock 前缀确保原子性,cmpxchgl 是 32 位 CMPXCHG: "=a" (result), "+m" (*dest) // 输出:result = EAX,dest 是内存操作数: "a" (expected), "r" (new_value) // 输入:EAX = expected,new_value 是寄存器操作数: "memory" // 告诉编译器内存可能被修改);return result == expected; // 返回是否成功
}int main() {int shared_value = 10;int expected = 10;int new_value = 20;std::cout << "Initial value: " << shared_value << std::endl;// 尝试原子地将 shared_value 从 expected (10) 更新为 new_value (20)bool success = atomic_compare_exchange(&shared_value, expected, new_value);if (success) {std::cout << "CAS succeeded. New value: " << shared_value << std::endl;} else {std::cout << "CAS failed. Value is still: " << shared_value << std::endl;}return 0;
}
缺点:
- ABA 问题:变量的值从 A 变为 B 又变回 A,CAS 无法检测到中间变化。
- 忙等待:在高竞争场景下,CAS 可能导致大量重试,浪费 CPU 资源。
- 复杂性:实现无锁数据结构需要复杂的逻辑和严格的正确性验证。
CAS操作的变体:
- Compare-And-Swap Weak:允许在某些情况下失败(如虚假失败),但性能更高。
- Compare-And-Swap Strong:确保只有在值不匹配时才失败,性能较低但更可靠。
Compare-And-Swap Weak在哪些情况下会失败
- 当前值与预期值不匹配
- 内存问题:
- 并发竞争 , 如果多个线程同时尝试修改同一个内存地址,CAS Weak 可能会因为竞争而失败。即使当前值与预期值相等,其他线程的干扰也可能导致操作失败。
- 内存访问冲突:如果内存访问冲突频繁发生,硬件可能会放弃 CAS Weak 操作,导致失败。
- 在某些弱内存模型(如 ARM 或 PowerPC)中,指令重排序可能会导致 CAS Weak 失败。例如,内存屏障未正确设置时,CAS Weak 可能会观察到不一致的内存状态。
- 如果系统资源(如缓存、内存带宽)紧张,CAS Weak 可能会因为资源竞争而失败。
- 缓存一致性协议的影响:在某些多核处理器架构中,缓存一致性协议(如 MESI 协议)可能导致 CAS Weak 失败。例如,当多个核心同时竞争同一内存地址时,缓存行的状态可能会发生变化,从而导致 CAS Weak 失败
Compare-And-Swap Weak 的伪代码实现
template<T>
bool cas_weak(T* dest, T& expected,const T& new_value)
{if (*dest == expected) {if (try_update(dest,new_value)){return true;} else {return false;}} else {expected = *dest;return false}
}
举例说明无锁结构
无锁链表
#include <atomic>
#include <memory>
#include <iostream>template <typename T>
class LockFreeLinkedList {
private:struct Node {std::shared_ptr<T> data; // 存储数据std::atomic<Node*> next; // 指向下一个节点的原子指针Node(T const& value) : data(std::make_shared<T>(value)), next(nullptr) {}};std::atomic<Node*> head; // 链表头指针public:LockFreeLinkedList() : head(nullptr) {}~LockFreeLinkedList() {// 析构时释放所有节点Node* current = head.load();while (current) {Node* next = current->next.load();delete current;current = next;}}// 插入操作void insert(T const& value) {Node* new_node = new Node(value); // 创建新节点new_node->next = head.load(); // 新节点的 next 指向当前头节点// CAS 操作:将 head 从当前值更新为新节点while (!head.compare_exchange_weak(new_node->next, new_node)) {// 如果 CAS 失败,说明 head 已经被其他线程修改,重新尝试}}// 删除操作bool remove(T const& value) {Node* prev = head.load(); // 前驱节点Node* curr = prev; // 当前节点while (curr) {Node* next = curr->next.load(); // 下一个节点// 如果当前节点的值匹配if (curr->data && *curr->data == value) {// CAS 操作:将前驱节点的 next 从当前节点更新为下一个节点if (prev == curr) {// 如果当前节点是头节点if (head.compare_exchange_weak(curr, next)) {delete curr; // 删除节点return true;}} else {if (prev->next.compare_exchange_weak(curr, next)) {delete curr; // 删除节点return true;}}}// 移动指针prev = curr;curr = next;}return false; // 未找到匹配的节点}// 打印链表内容(用于调试)void print() const {Node* current = head.load();while (current) {if (current->data) {std::cout << *current->data << " -> ";}current = current->next.load();}std::cout << "nullptr" << std::endl;}
};
无锁结构的问题
- ABA 问题:在无锁数据结构中,ABA 问题是一个常见的挑战。可以通过使用带有版本号的指针(如 std::atomic<std::shared_ptr>)或双字 CAS(Compare-And-Swap)来解决。
- 内存管理:无锁数据结构中的内存管理需要特别小心,因为多个线程可能同时访问和释放内存。通常使用智能指针(如 std::shared_ptr)来管理内存。
- 性能测试:无锁数据结构的性能在高并发环境下可能优于基于锁的数据结构,但在低并发环境下可能表现不佳。因此,在实际应用中需要进行充分的性能测试。