0%

一文读懂qspinlock

前言

  去年基于4.4.188分析过qspinlock,本文将基于更新版本的代码重新对qspinlock梳理。

正文

图片过多,流量预警!

数据结构

  qspinlock用不同的数据结构区分表示全局锁结构和用于本地自旋的数据结构,全局锁结构用struct qspinlock表示,本地自旋用mcs_spinlock表示。

struct mcs_spinlock

  一个CPU最多同时等待4个自旋锁,分别在进程上下文、中断上下文、软中断和NMI,因此每个cpu有一个mcs_nodes[MAX_NODES]数组,在不考虑虚拟化的情况下MAX_NODES大小为4。

1
static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[MAX_NODES]);

  next指向等锁队列的下一个节点,locked=1表示当前节点是等锁队列的队首,count用来表示该node在mcs_nodes数组的index。

mcs_spinlock                kernel/locking/mcs_spinlock.h
1
2
3
4
5
struct mcs_spinlock {
struct mcs_spinlock *next;
int locked; /* 1 if lock acquired */
int count; /* nesting count, see qspinlock.c */
};

struct qspinlock

  struct qspinlock构成如上图,含义如下:

  • locked: 用来指示锁是否已经被占有,0表示unlocked,1表示locked状态;
  • pending: 对MSC的进一步优化,第1个等待自旋锁的CPU设置pending位后直接在全局锁的locked上自旋;
  • tail index:用于指示队列的尾部节点mcs_nodes数组的索引;
  • 和tail cpu (+1):尾部节点的处理器编号+1
struct qspinlock                include/asm-generic/qspinlock_types.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
typedef struct qspinlock {
union {
atomic_t val;

/*
* By using the whole 2nd least significant byte for the
* pending bit, we can allow better optimization of the lock
* acquisition for the pending bit holder.
*/
#ifdef __LITTLE_ENDIAN
struct {
u8 locked;
u8 pending;
};
struct {
u16 locked_pending;
u16 tail;
};
#else
struct {
u16 tail;
u16 locked_pending;
};
struct {
u8 reserved[2];
u8 pending;
u8 locked;
};
#endif
};
} arch_spinlock_t;

  与传统的MCS相比,qspinlock通过存储队尾mcs节点的cpu号而不直接存储节点的地址将spinlock的结构大小压缩到32bit,争锁过程大致如下图:
 

  • 如果qspinlock整体val为0,说明锁空闲,则cpu设置qspinlock的locked位为1后直接持锁
  • 第1个等锁的cpu设置pending位后在qspinlock的locked位自旋
  • 第2个等锁的cpu将qspinlock的tail index和tail cpu信息指向本地mcs_spinlock,然后在qspinlock的locked_pending上自旋
  • 第n个等锁cpu将qspinlock的tail index和tail cpu信息指向本地mcs_spinlock,并将之前队尾节点的next指向自己,然后在本地mcs_spinlock的locked上自旋

代码分析

queued_spin_lock

  首先分析上锁过程queued_spin_lock,内存中的锁可能的值如下图:

  首先atomic_try_cmpxchg_acquireh比较lock->val和0,如相等则lock->val被赋值为QLOCKED_VAL,并返回0,如果不为0返回lock->val(上述过程是原子的)。如果返回0说明锁可用则直接return,否则调用queued_spin_lock_slowpath慢速路径。tomic_cmpxchg_acquire将比较&lock->val和0,如果&lock->val=0,说明当前锁空闲将&lock->val值设为QLOCKED_VAL并返回0。否则返回内存中&lock->val的值。

queued_spin_lock                include/asm-generic/qspinlock.h
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* queued_spin_lock - acquire a queued spinlock
* @lock: Pointer to queued spinlock structure
*/
static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
u32 val = 0;
//当前无cpu持锁,快速路经加锁
if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL)))
return;
//否则进入慢速路径等锁
queued_spin_lock_slowpath(lock, val);
}

queued_spin_lock_slowpath

  slowpath代码分为两部分,标号queue之前的“pending bit optimistic spinning”和标号后的“MCS queuing”

pending bit optimistic spinning

  进入慢速路径,当前val一定不为[0,0,0],如果val为[0,1,0]说明之前持锁的cpu刚释放,pending的cpu即将持锁,状态从[0,1,0]向[0,0,1]迁移,那么进行一定次数的自旋等待状态迁移。

这部分的代码改动commit为59fb586b4a07b4e1a0ee577140ab4842ba451acd,感兴趣的可以看下log

queued_spin_lock_slowpath                kernel/locking/qspinlock.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
struct mcs_spinlock *prev, *next, *node;
u32 old, tail;
int idx;

BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));

if (pv_enabled())
goto pv_queue;

if (virt_spin_lock(lock))
return;

/*
* Wait for in-progress pending->locked hand-overs with a bounded
* number of spins so that we guarantee forward progress.
*
* 0,1,0 -> 0,0,1
*/
if (val == _Q_PENDING_VAL) {
int cnt = _Q_PENDING_LOOPS;
val = atomic_cond_read_relaxed(&lock->val,
(VAL != _Q_PENDING_VAL) || !cnt--);
}


  如果pending或者tail不为0,说明不是第一个等锁cpu,跳转到标号queue排队。

queued_spin_lock_slowpath                kernel/locking/qspinlock.c
1
2
3
4
5
6
/*
* If we observe any contention; queue.
*/
if (val & ~_Q_LOCKED_MASK)
goto queue;


  走到这里,如果没有其他cpu竞争,内存中的lock->val的值应该为[0, 0, *],无论locked位是否为0都先将pending位无条件置1。queued_fetch_set_pending_acquire函数用于设置pending位,并返回内存中lock旧值。如果存在竞争lock->val可能为其他值。

queued_spin_lock_slowpath                kernel/locking/qspinlock.c
1
2
3
4
5
6
7
/*
* trylock || pending
*
* 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
*/
val = queued_fetch_set_pending_acquire(lock);

  如果刚刚调用queued_fetch_set_pending_acquire返回lock的旧值中tail或者pending位不为0,说明多个cpu争用,如果pending为0则要撤销刚设置的pending位,避免影响后续锁的状态迁移,然后跳转标号queue。

queued_spin_lock_slowpath                kernel/locking/qspinlock.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
* If we observe contention, there is a concurrent locker.
*
* Undo and queue; our setting of PENDING might have made the
* n,0,0 -> 0,0,0 transition fail and it will now be waiting
* on @next to become !NULL.
*/
if (unlikely(val & ~_Q_LOCKED_MASK)) {

/* Undo PENDING if we set it. */
if (!(val & _Q_PENDING_MASK))
clear_pending(lock);

goto queue;
}


  如果没有争用,直接在全局锁的locked上自旋直到持锁者释放后,清空pending并设置locked位抢到锁。

queued_spin_lock_slowpath                kernel/locking/qspinlock.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
* We're pending, wait for the owner to go away.
*
* 0,1,1 -> 0,1,0
*
* this wait loop must be a load-acquire such that we match the
* store-release that clears the locked bit and create lock
* sequentiality; this is because not all
* clear_pending_set_locked() implementations imply full
* barriers.
*/
if (val & _Q_LOCKED_MASK)
atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));

/*
* take ownership and clear the pending bit.
*
* 0,1,0 -> 0,0,1
*/
clear_pending_set_locked(lock);
qstat_inc(qstat_lock_pending, true);
return;


  下图是标号queue之前pending bit optimistic spinning的总结

MCS queuing
标号queue

  下面看下标号queue后的代码,首先从本CPU的mcs_nodes数组获取一个node并初始化,并构造tail信息。

queued_spin_lock_slowpath                kernel/locking/qspinlock.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
queue:
qstat_inc(qstat_lock_slowpath, true);
pv_queue:
node = this_cpu_ptr(&qnodes[0].mcs);
idx = node->count++;
tail = encode_tail(smp_processor_id(), idx);

node = grab_mcs_node(node, idx);

/*
* Keep counts of non-zero index values:
*/
qstat_inc(qstat_lock_idx1 + idx - 1, idx);

/*
* Ensure that we increment the head node->count before initialising
* the actual node. If the compiler is kind enough to reorder these
* stores, then an IRQ could overwrite our assignments.
*/
barrier();

node->locked = 0;
node->next = NULL;
pv_init_node(node);

  如果在执行上面那些代码的过程中,锁已经处于空闲状态,那么本CPU可以获得锁,但是由于需要释放刚才已经创建的node,所以还要跳转release。

queued_spin_lock_slowpath                kernel/locking/qspinlock.c
1
2
3
4
5
6
7
8
/*
* We touched a (possibly) cold cacheline in the per-cpu queue node;
* attempt the trylock once more in the hope someone let go while we
* weren't watching.
*/
if (queued_spin_trylock(lock))
goto release;


  否则把调用xchg_tail把全局lock的tail字段设置为本CPU node的信息,并且返回之前的tail节点信息。

queued_spin_lock_slowpath                kernel/locking/qspinlock.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
* Ensure that the initialisation of @node is complete before we
* publish the updated tail via xchg_tail() and potentially link
* @node into the waitqueue via WRITE_ONCE(prev->next, node) below.
*/
smp_wmb();

/*
* Publish the updated tail.
* We have already touched the queueing cacheline; don't bother with
* pending stuff.
*
* p,*,* -> n,*,*
*/
old = xchg_tail(lock, tail);
next = NULL;


  old & QTAIL_MASK如果为真,说明lock之前的tail节点信息不为空,即本CPU的node并不是队首节点。获取前驱节点prev,设置prev->next指向本CPU node,然后调用arch_mcs_spin_lock_contended在本CPU的&node->locked字段自旋等待&node->locked变为1(&node->locked表示node成为队首)。

queued_spin_lock_slowpath                kernel/locking/qspinlock.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
* if there was a previous node; link it and wait until reaching the
* head of the waitqueue.
*/
if (old & _Q_TAIL_MASK) { //如果不是队首节点
prev = decode_tail(old); //获取前驱节点

/* Link @node into the waitqueue. */
WRITE_ONCE(prev->next, node); //将前驱节点的next指向本节点

pv_wait_node(node, prev);
arch_mcs_spin_lock_contended(&node->locked); //在本地node的locked自旋等待locked变为1

/*
* While waiting for the MCS lock, the next pointer may have
* been set by another lock waiter. We optimistically load
* the next pointer & prefetch the cacheline for writing
* to reduce latency in the upcoming MCS unlock operation.
*/
next = READ_ONCE(node->next);
if (next)
prefetchw(next);
}

  变成队首节点后,等待现有持锁者释放锁,如果前面有pending的cpu,还要等待它持锁-解锁完成,因此队首节点在locked_pending上自旋等待全局lock的pending和locked两个字段都被清空。

queued_spin_lock_slowpath                kernel/locking/qspinlock.c
1
2
3
4
5
if ((val = pv_wait_head_or_lock(lock, node)))
goto locked;
//在locked_pending上自旋
val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));


标号locked

  全局lock的pending和locked两个字段都被清空后,队首节点接可以持锁了,首先判断本CPU node是不是等待队列的队尾节点,如果是队尾节点就直接持锁后跳转到release释放mcs node结构。否则设置lock的locked字段,然后等待next节点更新前驱节点的next信息后,调用arch_mcs_spin_unlock_contended把下一个队列节点的locked字段置1,使next节点成为新的等待队列的队首节点。

queued_spin_lock_slowpath                kernel/locking/qspinlock.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
locked:
/*
* claim the lock:
*
* n,0,0 -> 0,0,1 : lock, uncontended
* *,*,0 -> *,*,1 : lock, contended
*
* If the queue head is the only one in the queue (lock value == tail)
* and nobody is pending, clear the tail code and grab the lock.
* Otherwise, we only need to grab the lock.
*/

/*
* In the PV case we might already have _Q_LOCKED_VAL set, because
* of lock stealing; therefore we must also allow:
*
* n,0,1 -> 0,0,1
*
* Note: at this point: (val & _Q_PENDING_MASK) == 0, because of the
* above wait condition, therefore any concurrent setting of
* PENDING will make the uncontended transition fail.
*/
//如果是等待队列的队尾节点,直接持锁后跳转到release释放mcs node结构
if ((val & _Q_TAIL_MASK) == tail) {
if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
goto release; /* No contention */
}

/*
* Either somebody is queued behind us or _Q_PENDING_VAL got set
* which will then detect the remaining tail and queue behind us
* ensuring we'll see a @next.
*/
//否则先设置lock的locked字段持锁
set_locked(lock);
//等待next节点更新前驱节点的next信息
/*
* contended path; wait for next if not observed yet, release.
*/
if (!next)
next = smp_cond_load_relaxed(&node->next, (VAL));

arch_mcs_spin_unlock_contended(&next->locked);
pv_kick_node(lock, next);

标号release

  最后release mcs节点。

queued_spin_lock_slowpath                kernel/locking/qspinlock.c
1
2
3
4
5
release:
/*
* release the node
*/
__this_cpu_dec(qnodes[0].mcs.count);

总结

  下面的视频梳理了queued_spin_lock的流程。

联系我

  您可以直接在下方直接留言并留下您的邮箱,或者E-Mail联系我。

  • 本文作者: Lauren
  • 本文链接: http://lihanlu.cn/new-qspinlock/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!