无论什么锁(互斥锁、自旋锁等等),对于用户的接口都是差不多的,都是为了保护临界区:

lock()
//...
unlock()

futex()

是很底层的功能,可以构建更高等级更抽象的锁,比如互斥量,条件变量,读写锁,barriers 和信号量。因此,绝大多数程序都不会直接用到 futexes,而是使用一些系统类库提供的方法。 直接使用 futex 是很难用的,很多时候需要使用汇编。可以使用系统提供的 API 调用。

一个 futex 可以认定为一块内存,在进程或线程之间是共享的。在不同的进程之间,futex 不需要必须地址完全一样。本质上来说,futex 就像信号量一样,它是一个计数器,可以增加,也可以减少,进程需要一直等待,直到计数器的值变成了正数。

Futex 在整个用户层的操作都是非竞争性的。所有竞争性的判断都有内核完成。

从本质上来说,futex 就相当于一个始终保持一致性的整数,它只能通过原子性的汇编操作。进程可以使用 mmap,通过共享内存片段或是共享内存空间,在多线程之间共享它。

Spinlock

How does userspace use spinlock?

Spinlock 一般是实现在内核态还是用户态?

自旋锁(Spinlock)一般是实现在内核态,而不是用户态。

和原子变量的区别,请看原子变量^。

Linux 中主要有两种同步锁,一种是 spinlock,一种是 mutex。spinlock 和 mutex 的区别:spinlock 不会导致睡眠和调度,属于 busy wait 形式的锁,而后者可能导致睡眠和调度,属于 sleep wait 形式的锁。

spinlock 是最基础的一种锁,rwlock(读写锁),seqlock 等都是基于 spinlock 衍生出来的。就算是 mutex,它的实现与 spinlock 也是密不可分。

Linux kernel 的 spinlock 底层都是基于 queue spin lock 实现的。

先看看怎么用(使用 API):

static DEFINE_SPINLOCK(xxx_lock);
unsigned long flags;

// 可替换成 spin_lock_irq(), spin_lock()
spin_lock_irqsave(&xxx_lock, flags);
... critical section here ..
// 可替换成 spin_unlock_irq(), spin_unlock()
spin_unlock_irqrestore(&xxx_lock, flags);

spin_lock_irq(), spin_lock_irqsave(), spin_lock() 三者区别是什么?

spin_lock()spin_lock_irq() 的区别仅仅在于,是否调用 local_irq_disable() 函数。即是否关中断。

在任何情况下使用 spin_lock_irq() 都是安全的。因为它既关中断,又关内核抢占

spin_lock()spin_lock_irq() 速度快。

举个例子: 进程 A 中调用了 spin_lock(&lock) 然后进入临界区,此时来了一个中断,该中断也运行在和进程 A 相同的 CPU 上,并且在该中断处理程序中恰巧也会 spin_lock(&lock) 试图获取同一个锁。由于是在同一个 CPU 上被中断,进程 A 会被设置为 TASK_INTERRUPT 状态,中断处理程序无法获得锁,会不停的忙等,由于进程 A 被设置为中断状态,schedule() 进程调度就无法再调度进程 A 运行,这样就导致了死锁!但是如果该中断处理程序运行在不同的 CPU 上就不会触发死锁。 因为在不同的 CPU 上出现中断不会导致进程 A 的状态被设为 TASK_INTERRUPT,只是换出。当中断处理程序忙等被换出后,进程 A 还是有机会获得 CPU,执行并退出临界区。所以在使用 spin_lock() 时要明确知道该锁不会在中断处理程序中使用

spin_lock_irq()spin_lock_irqsave() 区别。

  • spin_lock_irqsave() 锁返回时,中断状态不会被改变,调用 spin_lock_irqsave() 前是开中断返回就开中断。
  • spin_lock_irq() 锁返回时,永远都是开中断,即使 spin_lock_irq() 前是关中断。

看起来 spin_lcok_irqsave() 更智能一些,毕竟没有人希望自己 spin unlock 过后中断被打开了。那为什么还有人用 spin_lock_irq() 呢?

因为不需要保存中断寄存器,所以性能更好,如果能确定之前就不是关中断的状态,那么使用 spin_lock_irq() 无疑具有更高的性能。

定义一个 spinlock 的实现

下面代码都是进行了合理简化之后的,可见还是基于 atomic_t 原子变量来实现的。

typedef struct qspinlock {
	union {
		atomic_t val;

		/*
		 * By using the whole 2nd least significant byte for the
		 * pending bit, we can allow better optimization of the lock
		 * acquisition for the pending bit holder.
		 */
		struct {
			u8	locked;
			u8	pending;
		};
		struct {
			u16	locked_pending;
			u16	tail;
		};
	};
} arch_spinlock_t;

typedef struct raw_spinlock {
	arch_spinlock_t raw_lock;
} raw_spinlock_t;

typedef struct spinlock {
    struct raw_spinlock rlock;
} spinlock_t;

#define DEFINE_SPINLOCK(x)	spinlock_t x = __SPIN_LOCK_UNLOCKED(x)

Spinlock 拿锁的实现 spin_lock_irqsave(), spin_lock_irq(), spin_lock()

spin_lock_irqsave
    raw_spin_lock_irqsave
        _raw_spin_lock_irqsave
            __raw_spin_lock_irqsave
static inline unsigned long __raw_spin_lock_irqsave(raw_spinlock_t *lock)
{
	unsigned long flags;

    // 关掉中断,关掉抢占
	local_irq_save(flags);
	preempt_disable();
	spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
	/*
	 * On lockdep we dont want the hand-coded irq-enable of
	 * do_raw_spin_lock_flags() code, because lockdep assumes
	 * that interrupts are not re-enabled during lock-acquire:
	 */
#ifdef CONFIG_LOCKDEP
	LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
#else
	do_raw_spin_lock_flags(lock, &flags);
#endif
	return flags;
}


spin_lock_irq
    raw_spin_lock_irq
        _raw_spin_lock_irq
static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
{
	local_irq_disable();
	preempt_disable();
	spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
	LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}


spin_lock
    raw_spin_lock
        _raw_spin_lock
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
	preempt_disable();
	spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
	LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

可以看到核心代码都是这两行:

spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    lock_acquire_exclusive(&lock->dep_map, 0, 0, NULL, _RET_IP_)
        lock_acquire(&lock->dep_map, 0, 0, 0, 1, NULL, _RET_IP_)
void lock_acquire(struct lockdep_map *lock, unsigned int subclass, int trylock, int read, int check,
			  struct lockdep_map *nest_lock, unsigned long ip)
{
	unsigned long flags;

    //...
	if (!debug_locks)
		return;

	if (unlikely(!lockdep_enabled())) {
		/* XXX allow trylock from NMI ?!? */
		if (lockdep_nmi() && !trylock) {
			struct held_lock hlock;

			hlock.acquire_ip = ip;
			hlock.instance = lock;
			hlock.nest_lock = nest_lock;
			hlock.irq_context = 2; // XXX
			hlock.trylock = trylock;
			hlock.read = read;
			hlock.check = check;
			hlock.hardirqs_off = true;
			hlock.references = 0;

			verify_lock_unused(lock, &hlock, subclass);
		}
		return;
	}

	raw_local_irq_save(flags);
	check_flags(flags);

	lockdep_recursion_inc();
	__lock_acquire(lock, subclass, trylock, read, check,
		       irqs_disabled_flags(flags), nest_lock, ip, 0, 0);
	lockdep_recursion_finish();
	raw_local_irq_restore(flags);
}
// 传进来一个 lock,一个
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
#define LOCK_CONTENDED(_lock, try, lock)			\
do {								\
	if (!try(_lock)) {					\
		lock_contended(&(_lock)->dep_map, _RET_IP_);	\
		lock(_lock);					\
	}							\
	lock_acquired(&(_lock)->dep_map, _RET_IP_);			\
} while (0)

queued_spin_lock_slowpath() Kernel

/**
 * queued_spin_lock_slowpath - acquire the queued spinlock
 * @lock: Pointer to queued spinlock structure
 * @val: Current value of the queued spinlock 32-bit word
 *
 * (queue tail, pending bit, lock value)
 *
 *              fast     :    slow                                  :    unlock
 *                       :                                          :
 * uncontended  (0,0,0) -:--> (0,0,1) ------------------------------:--> (*,*,0)
 *                       :       | ^--------.------.             /  :
 *                       :       v           \      \            |  :
 * pending               :    (0,1,1) +--> (0,1,0)   \           |  :
 *                       :       | ^--'              |           |  :
 *                       :       v                   |           |  :
 * uncontended           :    (n,x,y) +--> (n,0,0) --'           |  :
 *   queue               :       | ^--'                          |  :
 *                       :       v                               |  :
 * contended             :    (*,x,y) +--> (*,0,0) ---> (*,0,1) -'  :
 *   queue               :         ^--'                             :
 */
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
	struct mcs_spinlock *prev, *next, *node;
	u32 old, tail;
	int idx;

    //...
    // 如果是 pv spinlock,表示现在我们是在 guest kernel 中运行,
	// 那么就走到 pv_queue 中去走 hypercall 路径。
	if (pv_enabled())
		goto pv_queue;

	if (virt_spin_lock(lock))
		return;

	/*
	 * Wait for in-progress pending->locked hand-overs with a bounded
	 * number of spins so that we guarantee forward progress.
	 *
	 * 0,1,0 -> 0,0,1
	 */
	if (val == _Q_PENDING_VAL) {
		int cnt = _Q_PENDING_LOOPS;
		val = atomic_cond_read_relaxed(&lock->val,
					       (VAL != _Q_PENDING_VAL) || !cnt--);
	}

	/*
	 * If we observe any contention; queue.
	 */
	if (val & ~_Q_LOCKED_MASK)
		goto queue;

	/*
	 * trylock || pending
	 *
	 * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
	 */
	val = queued_fetch_set_pending_acquire(lock);

	/*
	 * If we observe contention, there is a concurrent locker.
	 *
	 * Undo and queue; our setting of PENDING might have made the
	 * n,0,0 -> 0,0,0 transition fail and it will now be waiting
	 * on @next to become !NULL.
	 */
	if (unlikely(val & ~_Q_LOCKED_MASK)) {

		/* Undo PENDING if we set it. */
		if (!(val & _Q_PENDING_MASK))
			clear_pending(lock);

		goto queue;
	}

	/*
	 * We're pending, wait for the owner to go away.
	 *
	 * 0,1,1 -> 0,1,0
	 *
	 * this wait loop must be a load-acquire such that we match the
	 * store-release that clears the locked bit and create lock
	 * sequentiality; this is because not all
	 * clear_pending_set_locked() implementations imply full
	 * barriers.
	 */
	if (val & _Q_LOCKED_MASK)
		atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));

	/*
	 * take ownership and clear the pending bit.
	 *
	 * 0,1,0 -> 0,0,1
	 */
	clear_pending_set_locked(lock);
	lockevent_inc(lock_pending);
	return;

	/*
	 * End of pending bit optimistic spinning and beginning of MCS
	 * queuing.
	 */
queue:
	lockevent_inc(lock_slowpath);
pv_queue:
	node = this_cpu_ptr(&qnodes[0].mcs);
	idx = node->count++;
	tail = encode_tail(smp_processor_id(), idx);

	/*
	 * 4 nodes are allocated based on the assumption that there will
	 * not be nested NMIs taking spinlocks. That may not be true in
	 * some architectures even though the chance of needing more than
	 * 4 nodes will still be extremely unlikely. When that happens,
	 * we fall back to spinning on the lock directly without using
	 * any MCS node. This is not the most elegant solution, but is
	 * simple enough.
	 */
	if (unlikely(idx >= MAX_NODES)) {
		lockevent_inc(lock_no_node);
		while (!queued_spin_trylock(lock))
			cpu_relax();
		goto release;
	}

	node = grab_mcs_node(node, idx);

	/*
	 * Keep counts of non-zero index values:
	 */
	lockevent_cond_inc(lock_use_node2 + idx - 1, idx);

	/*
	 * Ensure that we increment the head node->count before initialising
	 * the actual node. If the compiler is kind enough to reorder these
	 * stores, then an IRQ could overwrite our assignments.
	 */
	barrier();

	node->locked = 0;
	node->next = NULL;
	pv_init_node(node);

	/*
	 * We touched a (possibly) cold cacheline in the per-cpu queue node;
	 * attempt the trylock once more in the hope someone let go while we
	 * weren't watching.
	 */
	if (queued_spin_trylock(lock))
		goto release;

	/*
	 * Ensure that the initialisation of @node is complete before we
	 * publish the updated tail via xchg_tail() and potentially link
	 * @node into the waitqueue via WRITE_ONCE(prev->next, node) below.
	 */
	smp_wmb();

	/*
	 * Publish the updated tail.
	 * We have already touched the queueing cacheline; don't bother with
	 * pending stuff.
	 *
	 * p,*,* -> n,*,*
	 */
	old = xchg_tail(lock, tail);
	next = NULL;

	/*
	 * if there was a previous node; link it and wait until reaching the
	 * head of the waitqueue.
	 */
	if (old & _Q_TAIL_MASK) {
		prev = decode_tail(old);

		/* Link @node into the waitqueue. */
		WRITE_ONCE(prev->next, node);

		pv_wait_node(node, prev);
		arch_mcs_spin_wait(&node->locked);

		/*
		 * While waiting for the MCS lock, the next pointer may have
		 * been set by another lock waiter. We optimistically load
		 * the next pointer & prefetch the cacheline for writing
		 * to reduce latency in the upcoming MCS unlock operation.
		 */
		next = READ_ONCE(node->next);
		if (next)
			prefetchw(next);
	}

	/*
	 * we're at the head of the waitqueue, wait for the owner & pending to
	 * go away.
	 *
	 * *,x,y -> *,0,0
	 *
	 * this wait loop must use a load-acquire such that we match the
	 * store-release that clears the locked bit and create lock
	 * sequentiality; this is because the set_locked() function below
	 * does not imply a full barrier.
	 *
	 * The PV pv_wait_head_or_lock function, if active, will acquire
	 * the lock and return a non-zero value. So we have to skip the
	 * atomic_cond_read_acquire() call. As the next PV queue head hasn't
	 * been designated yet, there is no way for the locked value to become
	 * _Q_SLOW_VAL. So both the set_locked() and the
	 * atomic_cmpxchg_relaxed() calls will be safe.
	 *
	 * If PV isn't active, 0 will be returned instead.
	 *
	 */
	if ((val = pv_wait_head_or_lock(lock, node)))
		goto locked;

	val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));

locked:
	/*
	 * claim the lock:
	 *
	 * n,0,0 -> 0,0,1 : lock, uncontended
	 * *,*,0 -> *,*,1 : lock, contended
	 *
	 * If the queue head is the only one in the queue (lock value == tail)
	 * and nobody is pending, clear the tail code and grab the lock.
	 * Otherwise, we only need to grab the lock.
	 */

	/*
	 * In the PV case we might already have _Q_LOCKED_VAL set, because
	 * of lock stealing; therefore we must also allow:
	 *
	 * n,0,1 -> 0,0,1
	 *
	 * Note: at this point: (val & _Q_PENDING_MASK) == 0, because of the
	 *       above wait condition, therefore any concurrent setting of
	 *       PENDING will make the uncontended transition fail.
	 */
	if ((val & _Q_TAIL_MASK) == tail) {
		if (try_clear_tail(lock, val, node))
			goto release; /* No contention */
	}

	/*
	 * Either somebody is queued behind us or _Q_PENDING_VAL got set
	 * which will then detect the remaining tail and queue behind us
	 * ensuring we'll see a @next.
	 */
	set_locked(lock);

	/*
	 * contended path; wait for next if not observed yet, release.
	 */
	if (!next)
		next = smp_cond_load_relaxed(&node->next, (VAL));

	mcs_lock_handoff(node, next);
	pv_kick_node(lock, next);

release:
	/*
	 * release the node
	 */
	__this_cpu_dec(qnodes[0].mcs.count);
}
EXPORT_SYMBOL(queued_spin_lock_slowpath);

Locking lessons — The Linux Kernel documentation

应用场景:

  • 可以在中断上下文执行。由于不睡眠,因此 spinlock 可以在中断上下文中适用。
  • 执行时间短。由于 spin lock 死等这种特性,因此它使用在那些代码不是非常复杂的临界区(当然也不能太简单,否则使用原子操作或者其他适用简单场景的同步机制就 OK 了),如果临界区执行时间太长,那么不断在临界区门口“死等”的那些 thread 是多么的浪费 CPU 啊(当然,现代 CPU 的设计都会考虑同步原语的实现,例如 ARM 提供了 WFE 和 SEV 这样的类似指令,避免 CPU 进入 busy loop 的悲惨境地)

原子变量

锁保持的是指令间的值不变,而原子变量保持的是指令内的值不变。

The purpose of atomic_t is to implement a type and a set of functions that can operate on it correctly from different threads without requiring synchronization like mutexes.

不要把 atomic_t 的作用和锁弄混,他们不是一个生态位的。atomic_t 并不是一个锁可以 acquire/release,它代表了我们要保护的数据本身,对应到锁里,就是锁要保护的数据。

因为原子变量是借助硬件的,所以其能保存的数据只能是一个 word。因此原子变量能够让我们要保护的数据很小的情况下,不需要用锁,而是借助硬件的能力。

可以看看这个:what is the purpose of atomic_t? - C / C++

atomic_t / atomic64_t

typedef struct {
	int counter;
} atomic_t;

typedef struct {
	s64 counter;
} atomic64_t;

atomic_t 使用场景

原子变量并不是根据。

void kvm_inject_apic_timer_irqs(struct kvm_vcpu *vcpu)
{
	struct kvm_lapic *apic = vcpu->arch.apic;

    // read 和 set 之间肯定有可能被其他进程改吧。
	if (atomic_read(&apic->lapic_timer.pending) > 0) {
		kvm_apic_inject_pending_timer_irqs(apic);
		atomic_set(&apic->lapic_timer.pending, 0);
	}
}

atomic_t API

接口函数 描述
void atomic_add(int i, atomic_t *v) 给一个原子变量 v 增加 i
int atomic_add_return(int i, atomic_t *v) 同上,只不过将变量 v 的最新值返回
void atomic_sub(int i, atomic_t *v) 给一个原子变量 v 减去 i
int atomic_sub_return(int i, atomic_t *v) 同上,只不过将变量 v 的最新值返回
int atomic_cmpxchg(atomic_t *ptr, int old, int new) 比较 old 和原子变量 ptr 中的值,如果相等,那么就把 new 值赋给原子变量。返回旧的原子变量 ptr 中的值
atomic_read 获取原子变量的值
atomic_set 设定原子变量的值
atomic_inc(v) 原子变量的值加一
atomic_inc_return(v) 同上,只不过将变量 v 的最新值返回
atomic_dec(v) 原子变量的值减去一
atomic_dec_return(v) 同上,只不过将变量 v 的最新值返回
atomic_sub_and_test(i, v) 给一个原子变量 v 减去 i,并判断变量 v 的最新值是否等于 0
atomic_add_negative(i,v) 给一个原子变量 v 增加 i,并判断变量 v 的最新值是否是负数
int atomic_add_unless(atomic_t *v, int a, int u) 只要原子变量 v 不等于 u,那么就执行原子变量 v 加 a 的操作。如果 v 不等于 u,返回非 0 值,否则返回 0 值