普通自旋锁有一些缺点:
- 对所有的竞争者不做区分。
- 很多情况有些竞争者并不会修改共享资源
- 普通自旋锁总是会限制只有一个内核路径持有锁
读写锁的改进:
- 允许多个读者同时持有读锁
- 只允许一个写者同时持有写锁
- 不允许读者和写者同时持有锁
- 读写自旋锁更适合读者多。写着少的场景
所以现在追一下linux内核读写自旋锁的代码。
rwlock_init(lock)//初始化读写自旋锁
read_lock(lock)//读者加锁
write_lock(lock)//写者加锁
read_unlock(lock) //读者解锁
write_unlock(lock) //写者解锁
二、读写自旋锁
1.锁的结构体
typedef struct {arch_rwlock_t raw_lock;
#ifdef CONFIG_DEBUG_SPINLOCKunsigned int magic, owner_cpu;void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOCstruct lockdep_map dep_map;
#endif
} rwlock_t;
如果不开启debug功能,rwlock_t只有一个arch_rwlock_t raw_lock成员:
typedef struct qrwlock {union {atomic_t cnts;struct {
#ifdef __LITTLE_ENDIANu8 wlocked; /* Locked for write? */u8 __lstate[3];
#elseu8 __lstate[3];u8 wlocked; /* Locked for write? */
#endif};};arch_spinlock_t wait_lock;
} arch_rwlock_t;
struct qrwlock 跟自旋锁一样,也是一个联合体,所以其的本质仍然是一个整数。但是,读写自旋锁除了一个整数的联合体外,还有一个成员arch_spinlock_t wait_lock;这个成员是不是很熟悉,没有错,他就是我们上次讲解普通自旋锁时候,包含在最里面的整数。也就是说读写自旋锁的本质是两个整数。
2.锁的初始化
我们是使用rwlock_init函数来初始化一个读写自旋锁的:
# define rwlock_init(lock) \do { *(lock) = __RW_LOCK_UNLOCKED(lock); } while (0)#define __RW_LOCK_UNLOCKED(lockname) \(rwlock_t) { .raw_lock = __ARCH_RW_LOCK_UNLOCKED, \RW_DEP_MAP_INIT(lockname) }
rwlock_init,和spin_lock_init一样最主要做的是判断是否申请了锁的需要的内存,后面的都是debug才需要的初始化操作,没有开启自旋锁的debug则都是空操作。
3.读者加锁操作
我们是使用read_lock函数来进行读者加锁操作的:
#define read_lock(lock) _raw_read_lock(lock)
#define _raw_read_lock(lock) __raw_read_lock(lock)static inline void __raw_read_lock(rwlock_t *lock)
{preempt_disable();//关闭抢占rwlock_acquire_read(&lock->dep_map, 0, 0, _RET_IP_);//空操作LOCK_CONTENDED(lock, do_raw_read_trylock, do_raw_read_lock);
}
rwlock_acquire_read其实是避免锁递归的一个操作,但是由于内核的原因,这是一个空操作。LOCK_CONTENDED这个宏定义我们之前在自旋锁那里已经看到过了,但是我们读写自旋锁的trylock和lock的传入参数和普通自旋锁的不一样,我们这里需要讲讲:
# define do_raw_read_lock(rwlock) do {__acquire(lock); arch_read_lock(&(rwlock)->raw_lock); } while (0)
# define __acquire(x) (void)0
#define arch_read_lock(l) queued_read_lock(l)static inline void queued_read_lock(struct qrwlock *lock)
{u32 cnts;cnts = atomic_add_return_acquire(_QR_BIAS, &lock->cnts);//如果锁没有被占用,则直接加锁if (likely(!(cnts & _QW_WMASK)))return;/* The slowpath will decrement the reader count, if necessary. */queued_read_lock_slowpath(lock);//如果锁被占用了,慢速排队加锁
}
do_raw_read_lock实际就是调用queued_read_lock进行加锁,首先使用atomic_add_return_acquire,atomic_add_return_acquire后面太乱了,很难追,不知道走哪条路,但是大概意思是使用原子操作修改lock->cnts的值,如果锁没有被占用,则可以使用原子操作加一,否则就是该锁已经被占用了。atomic_add_return_acquire应该是这样子走的:
#define atomic_add_return_acquire atomic_add_return_acquire#define atomic_add_return_acquire(...) \__atomic_op_acquire(atomic_add_return, __VA_ARGS__)#define __atomic_op_acquire(op, args...) \
({ \typeof(op##_relaxed(args)) __ret = op##_relaxed(args); \__atomic_acquire_fence(); \__ret; \
})#define __atomic_acquire_fence smp_mb__after_atomic
#define smp_mb__after_atomic() __smp_mb__after_atomic()
#define __smp_mb__after_atomic() __smp_mb()static __always_inline int atomic_add_return(int i, atomic_t *v)
{kasan_check_write(v, sizeof(*v));return arch_atomic_add_return(i, v);
}tatic __always_inline int arch_atomic_add_return(int i, atomic_t *v)
{return i + xadd(&v->counter, i);
}
如果如果锁被占用了,慢速排队加锁,则会走queued_read_lock_slowpath函数:
void queued_read_lock_slowpath(struct qrwlock *lock)
{/** Readers come here when they cannot get the lock without waiting*/if (unlikely(in_interrupt())) {/** Readers in interrupt context will get the lock immediately* if the writer is just waiting (not holding the lock yet),* so spin with ACQUIRE semantics until the lock is available* without waiting in the queue.*/atomic_cond_read_acquire(&lock->cnts, !(VAL & _QW_LOCKED));return;}atomic_sub(_QR_BIAS, &lock->cnts);/** Put the reader into the wait queue*/arch_spin_lock(&lock->wait_lock);atomic_add(_QR_BIAS, &lock->cnts);/** The ACQUIRE semantics of the following spinning code ensure* that accesses can't leak upwards out of our subsequent critical* section in the case that the lock is currently held for write.*/atomic_cond_read_acquire(&lock->cnts, !(VAL & _QW_LOCKED));/** Signal the next one in queue to become queue head*/arch_spin_unlock(&lock->wait_lock);
}
EXPORT_SYMBOL(queued_read_lock_slowpath);
4.写者加锁操作
我们是使用write_lock函数来进行写者加锁操作的:
#define write_lock(lock) _raw_write_lock(lock)
#define _raw_write_lock(lock) __raw_write_lock(lock)static inline void __raw_write_lock(rwlock_t *lock)
{preempt_disable();rwlock_acquire(&lock->dep_map, 0, 0, _RET_IP_);LOCK_CONTENDED(lock, do_raw_write_trylock, do_raw_write_lock);
}
看到这里,我们看看do_raw_write_lock是怎么加锁的:
# define do_raw_write_lock(rwlock) do {__acquire(lock); arch_write_lock(&(rwlock)->raw_lock); } while (0)
#define arch_write_lock(l) queued_write_lock(l)static inline void queued_write_lock(struct qrwlock *lock)
{/* Optimize for the unfair lock case where the fair flag is 0. */if (atomic_cmpxchg_acquire(&lock->cnts, 0, _QW_LOCKED) == 0)return;queued_write_lock_slowpath(lock);
}
看到这里,我们是不是很熟悉,有是熟悉的配方,一个快速路径和一个慢速路径。快速路劲就不多说了,他就直接加锁,加锁成功就返回,加锁失败就走慢速路劲,queued_write_lock_slowpath:
void queued_write_lock_slowpath(struct qrwlock *lock)
{/* Put the writer into the wait queue *///使用自旋锁的方式给wait_lock加锁arch_spin_lock(&lock->wait_lock);/* Try to acquire the lock directly if no reader is present *///读取锁的值,判断是否存在读者,如果没有读者,我们就尝试给写者上锁if (!atomic_read(&lock->cnts) &&(atomic_cmpxchg_acquire(&lock->cnts, 0, _QW_LOCKED) == 0))goto unlock;/* Set the waiting flag to notify readers that a writer is pending *///设置等待标志以通知阅读器有写入器正在等待,后面有读者获取锁会失败atomic_add(_QW_WAITING, &lock->cnts);/* When no more readers or writers, set the locked flag *///等到读者或者写者释放锁,然后加锁do {atomic_cond_read_acquire(&lock->cnts, VAL == _QW_WAITING);} while (atomic_cmpxchg_relaxed(&lock->cnts, _QW_WAITING,_QW_LOCKED) != _QW_WAITING);
unlock:arch_spin_unlock(&lock->wait_lock);//使用自旋锁的方式给wait_lock解锁
}
我们很清晰的知道,写者加锁比读者简单很多,因为写者只有读者和写者都不持有锁的情况下才能加锁成功,所以只需等到cnts为0即可。而读者加锁需要判断当前持有锁的是读者还是写者,如果是写者这等待,如果是读者则加锁成功。
5.读者解锁操作
我们是使用read_unlock函数来进行读者解锁操作的:
#define read_unlock(lock) _raw_read_unlock(lock)
#define _raw_read_unlock(lock) __raw_read_unlock(lock)static inline void __raw_read_unlock(rwlock_t *lock)
{rwlock_release(&lock->dep_map, 1, _RET_IP_);//空操作do_raw_read_unlock(lock);//解锁操作preempt_enable();//开启抢占
}# define do_raw_read_unlock(rwlock) do {arch_read_unlock(&(rwlock)->raw_lock); __release(lock); } while (0)
#define arch_read_unlock(l) queued_read_unlock(l)static inline void queued_read_unlock(struct qrwlock *lock)
{/** Atomically decrement the reader count*///原子操作,cnts减1(void)atomic_sub_return_release(_QR_BIAS, &lock->cnts);}
因为解锁没有竞争,直接使用原子操作设置cnts的值即可。
6.写者解锁操作
我们是使用write_unlock函数来进行写者解锁操作的,write_unlock和read_unlock是差不多的,这里就不多写了。