自旋锁是为了实现保护共享资源的互斥使用而提出的一种锁机制。与互斥锁不同,自旋锁不会引起调用者睡眠,如果自旋锁已经被别的执行单元保持,调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁,“自旋"一词就是因此而得名。

使用场景

下面几点是自旋锁的几个特点:

  • 自旋锁(spin lock)是一种忙等的锁机制。在操作系统中,为了防止资源被两个线程同时访问,导致数据不一致,通常需要一种锁机制。通常有有两种实现方式:一个是忙等,另外一个是挂起当前进程,调度其他进程执行。
    • 自旋锁则是一种忙等的机制,当前的执行 thread 会不断的重新尝试直到获取锁进入临界区。即等待时内核无事可做(除了浪费时间),进程在 CPU 上保持运行,所以它保护的临界区必须小,且操作过程必须短。
    • 信号量和读写信号量适合于保持时间较长的情况,它们会导致调用者睡眠,因此只能在进程上下文使用(_trylock 的变种能够在中断上下文使用),而自旋锁适合于保持时间非常短的情况,它可以在任何上下文使用。
    • 如果被保护的共享资源只在进程上下文访问,使用信号量保护该共享资源非常合适,如果对共巷资源的访问时间非常短,自旋锁也可以。但是如果被保护的共享资源需要在中断上下文访问(包括底半部即中断处理句柄和顶半部即软中断),就必须使用自旋锁。
  • 只允许一个 thread 进入。信号量(semaphore)可以允许多个 thread 同时进入,而自旋锁一次只能有一个 thread 获取锁并进入临界区,其他的 thread 都是在门口不断的尝试。
  • 执行时间短。由于自旋锁死等这种特性,因此它使用在那些代码不是非常复杂的临界区(也就是切换线程成本相对于临界区的访问成本很高的场景)。如果临界区执行时间太长,那么不断在临界区门口“死等”的那些 thread 将会耗费大量的计算资源,显然是不合适的。
  • 被自旋锁保护的临界区代码执行时,它不能因为任何原因放弃处理器。
    • 可以在中断上下文执行。由于不睡眠,被自旋锁保护的临界区代码执行时是不能被被其他中断,因此自旋锁可以在中断上下文中适用
    • 被自旋锁保护的临界区代码执行时,内核不能被抢占
    • 被自旋锁保护的临界区代码执行时不能进入休眠

自旋锁最初是为了在多处理器系统(SMP)使用而设计的,但是只要考虑到并发问题,单处理器在运行可抢占内核时其行为就类似于 SMP。因此,自旋锁对于 SMP 和单处理器可抢占内核都适用。可以想象,当一个处理器处于自旋状态时,它做不了任何有用的工作,因此自旋锁对于单处理器不可抢占内核没有意义,实际上,非抢占式的单处理器系统上自旋锁被实现为空操作,不做任何事情。

自旋锁有几个重要的特性:

  • 被自旋锁保护的临界区代码执行时不能进入休眠。
  • 被自旋锁保护的临界区代码执行时是不能被被其他中断中断。
  • 被自旋锁保护的临界区代码执行时,内核不能被抢占。从这几个特性可以归纳出一个共性:被自旋锁保护的临界区代码执行时,它不能因为任何原因放弃处理器。

考虑上面第一种情况,想象你的内核代码请求到一个自旋锁并且在它的临界区里做它的事情,在中间某处,你的代码失去了处理器。或许它已调用了一个函数(copy_from_user,假设)使进程进入睡眠。也或许,内核抢占发威,一个更高优先级的进程将你的代码推到了一边。此时,正好某个别的线程想获取同一个锁,如果这个线程运行在和你的内核代码不同的处理器上(幸运的情况),那么它可能要自旋等待一段时间(可能很长),当你的代码从休眠中唤醒或者重新得到处理器并释放锁,它就能得到锁。而最坏的情况是,那个想获取锁得线程刚好和你的代码运行在同一个处理器上,这时它将一直持有 CPU 进行自旋操作,而你的代码是永远不可能有任何机会来获得 CPU 释放这个锁了,这就是悲催的死锁。

考虑上面第二种情况,和上面第一种情况类似。假设我们的驱动程序正在运行,并且已经获取了一个自旋锁,这个锁控制着对设备的访问。在拥有这个锁得时候,设备产生了一个中断,它导致中断处理例程被调用,而中断处理例程在访问设备之前,也要获得这个锁。当中断处理例程和我们的驱动程序代码在同一个处理器上运行时,由于中断处理例程持有 CPU 不断自旋,我们的代码将得不到机会释放锁,这也将导致死锁。

因此,如果我们有一个自旋锁,它可以被运行在(硬件或软件)中断上下文中的代码获得,则必须使用某个禁用中断的 spin_lock 形式的锁来禁用本地中断(注意,只是禁用本地 CPU 的中断,不能禁用别的处理器的中断),使用其他的锁定函数迟早会导致系统死锁(导致死锁的时间可能不定,但是发生上述死锁情况的概率肯定是有的,看处理器怎么调度了)。如果我们不会在硬中断处理例程中访问自旋锁,但可能在软中断(例如,以 tasklet 的形式运行的代码)中访问,则应该使用 spin_lock_bh,以便在安全避免死锁的同时还能服务硬件中断。

因为自旋锁不会引起调用者睡眠,所以自旋锁的效率远高于互斥锁。虽然它的效率比互斥锁高,但是它也有些不足之处:

  • 自旋锁一直占用 CPU,他在未获得锁的情况下,一直运行--自旋,所以占用着 CPU,如果不能在很短的时 间内获得锁,这无疑会使 CPU 效率降低。
  • 在用自旋锁时有可能造成死锁,当递归调用时有可能造成死锁,调用有些其他函数也可能造成死锁,如 copy_to_user()、copy_from_user()、kmalloc()等。

因此我们要慎重使用自旋锁,自旋锁只有在内核可抢占式或 SMP 的情况下才真正需要,在单 CPU 且不可抢占式的内核下,自旋锁的操作为空操作。自旋锁适用于锁使用者保持锁时间比较短的情况下。

函数接口

基本接口

对于基本的使用,自旋锁的使用很简单,主要涉及 3 部分内容: 1. 定义一个自旋锁 2. 对临界区加锁 3. 使用完后解锁

定义一个自旋锁 定义自旋锁就好像定义一个变量。下面函数用于定义一个自旋锁。

1
2
spinlock_t lock; //定义一个自旋锁变量
spin_lock_init(&lock); //进行自旋锁变量的初始化

自旋锁加锁 自旋锁加锁就是组织其它线程对相同临界区的访问,使用方法也非常简单。函数原型如下:

1
void spin_lock(spinlock_t *lock)

自旋锁解锁 不多废话了,下面是函数原型:

1
void spin_unlock(spinlock_t *lock)

试探加锁 由于自旋锁在临界区已经加锁的情况下会导致其它想进入临界区的线程处于忙等状态,这样会消耗 CPU 资源。有的时候我们不想这样,内核又提供了另外一个接口,该接口会首先判断是否可以进入,如果可以进入则获取锁资源,否则返回失败。

1
int spin_trylock(spinlock_t *lock)

应用示例

我们这里只是一个非常简单的示例,在该示例中有 2 个线程,分别进行对同一个变量的运算。如果没有保护机制,数据将被计算混乱。通过自旋锁,使得计算可以依次有序的进行,从而保证数据的正确性。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include <linux/init.h>
#include <linux/module.h>
#include <linux/kernel.h>
#include <linux/mm.h>

#include <linux/in.h>
#include <linux/inet.h>
#include <linux/socket.h>
#include <net/sock.h>
#include <linux/kthread.h>
#include <linux/sched.h>
#include <linux/spinlock.h>

#define BUF_SIZE 1024

struct task_data {
    int progress;
};

struct task_struct *main_task;
struct task_struct *client_task;

/* 在这里定义自旋锁及要保护的数据,这里只是为了说明用法
 * 实际生产中不会这么用,因为对于一个简单数据,可以通过
 * 原子变量实现。 */
spinlock_t lock;
struct task_data thread_data;

static inline void sleep(unsigned sec)
{
    __set_current_state(TASK_INTERRUPTIBLE);
    schedule_timeout(sec * HZ);
}

/* 这个是2个线程中的一个,只是为了说明自旋锁的用法。  */
static int multhread_server(void *data)
{
    struct task_data *cur = (struct task_data *)data;
    while (!kthread_should_stop()) {
        printk(KERN_NOTICE "server thread run begin %d\n", cur->progress);
        sleep(1);

        /* 自旋锁加锁 */
        spin_lock(&lock);
        /* 临界区,也就是被保护的数据的操作 */
        cur->progress ++;
        /* 自旋锁解锁 */
        spin_unlock(&lock);
        printk(KERN_NOTICE "server thread run after %d\n", cur->progress);

    }

    return 0;
}

static int multhread_client(void *data)
{
    struct task_data *cur = (struct task_data *)data;
    while(!kthread_should_stop()) {
        printk(KERN_NOTICE "client thread run begin %d\n", cur->progress);
        sleep(1);
        spin_lock(&lock);
        cur->progress += 2;
        spin_unlock(&lock);
        printk(KERN_NOTICE "client thread run after %d\n", cur->progress);

    }

    return 0;
}

static int multhread_init(void)
{
    ssize_t ret = 0;

    thread_data.progress = 0;
    /* 进行自旋锁的初始化  */
    spin_lock_init(&lock);

    printk("Hello, socket \n");
    main_task = kthread_run(multhread_server,
                  &thread_data,
                  "multhread_server");
    if (IS_ERR(main_task)) {
        ret = PTR_ERR(main_task);
        goto failed;
    }

    client_task = kthread_run(multhread_client,
                  &thread_data,
                  "multhread_client");
    if (IS_ERR(client_task)) {
        ret = PTR_ERR(client_task);
        goto client_failed;
    }

    return ret;
client_failed:
    kthread_stop(main_task);
failed:
    return ret;
}

static void multhread_exit(void)
{
    printk("Bye!\n");
    kthread_stop(main_task);
    kthread_stop(client_task);

}

module_init(multhread_init);
module_exit(multhread_exit);

MODULE_LICENSE("GPL");
MODULE_AUTHOR("SunnyZhang<shuningzhang@126.com>");

增强接口

除了上面介绍的基本的加锁和解锁的接口外,Linux 内核还实现增强的功能。比如可以在中断环境中使用的自旋锁和下半部使用的自旋锁等等。下面是自旋锁所涉及的接口列表。

宏定义 功能描述
spin_lock_init(spinlock_t *lock) 初始化自旋锁,将自旋锁设置为 1,表示有一个资源可用
spin_is_locked(spinlock_t *lock) 如果自旋锁被置为 1(未锁),返回 0,否则返回 1
spin_unlock_wait(spinlock_t *lock) 等待直到自旋锁解锁(为 1),返回 0,否则返回 1
spin_trylock(spinlock_t *lock) 尝试锁上自旋锁(置 0),如果原来锁的值为 1,返回 1,否则返回 0
spin_lock(spinlock_t *lock) 循环等待直到自旋锁解锁(置为 1),然后将自旋锁锁上(置为 0)
spin_unlock(spinlock_t *lock) 将自旋锁解锁(置为 1)
spin_lock_irqsave(spinlock_t *lock, unsigned long flags) 循环等待直到自旋锁解锁(置为 1),然后将自旋锁锁上(置为 0)。在获得自旋锁之前禁用硬中断(只在本地处理器上),而先前的中断状态保存在 flags 中
spin_unlock_irqstore(spinlock_t *lock, unsigned long flags) 将自旋锁解锁(置为 1),开中断,将状态寄存器值从 flags 存入状态寄存器
spin_lock_irq(spinlock_t *lock) 循环等待直到自旋锁解锁(置为 1),然后将自旋锁锁上(置为 0)。关中断。
spin_unlock_irq(spinlock_t *lock) 将自旋锁解锁(置为 1),开中断
spin_lock_bh(spinlock_t *lock) 在获得锁前禁用软中断,保持硬中断打开状态
spin_unlock_bh(spinlock_t *lock) 循环等待直到自旋锁解锁(置为 1),然后将自旋锁锁上(置为 0),阻止软中断的底半部的执行

我们以支持中断的自旋锁为例进行说明。其实支持中断的自旋锁内部仅仅增加了禁止本地中断的函数调用。具体为什么加这个,大家可以自行思考一下,原因是比较清楚的,本文不再赘述。

1
2
3
4
5
6
7
static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
{
    local_irq_disable(); //仅仅多了这个函数调用,禁止本地中断
    preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

实现操作

从保证临界区访问原子性的目的来考虑,自旋锁应该阻止在代码运行过程中出现的任何并发干扰。这些“干扰”包括:

1、中断,包括硬件中断和软件中断(仅在中断代码可能访问临界区时需要)

这种干扰存在于任何系统中,一个中断的到来导致了中断例程的执行,如果在中断例程中访问了临界区,原子性就被打破了。所以如果在某种中断例程中存在访问某个临界区的代码,那么就必须用spinlock保护。对于不同的中断类型(硬件中断和软件中断)对应于不同版本的自旋锁实现,其中包含了中断禁用和开启的代码。但是如果你保证没有中断代码会访问临界区,那么使用不带中断禁用的自旋锁API即可。

2、内核抢占(仅存在于可抢占内核中)

在2.6以后的内核中,支持内核抢占,并且是可配置的。这使UP系统和SMP类似,会出现内核态下的并发。这种情况下进入临界区就需要避免因抢占造成的并发,所以解决的方法就是在加锁时禁用抢占(preempt_disable(); ),在开锁时开启抢占(preempt_enable();注意此时会执行一次抢占调度)。

3、其他处理器对同一临界区的访问(仅 SMP 系统)

在 SMP 系统中,多个物理处理器同时工作,导致可能有多个进程物理上的并发。这样就需要在内存加一个标志,每个需要进入临界区的代码都必须检查这个标志,看是否有进程已经在这个临界区中。这种情况下检查标志的代码也必须保证原子和快速,这就要求必须精细地实现,正常情况下每个构架都有自己的汇编实现方案,保证检查的原子性。

有些人会以为自旋锁的自旋检测可以用 for 实现,这种想法“Too young, *too simple*, sometimes naive”!你可以在理论上用 C 去解释,但是如果用 for,起码会有如下两个问题:

  • 你如何保证在 SMP 下其他处理器不会同时访问同一个的标志呢?(也就是标志的独占访问)
  • 必须保证每个处理器都不会去读取高速缓存而是真正的内存中的标志(可以实现,编程上可以用*volitale*)

要根本解决这个问题,需要在芯片底层实现物理上的内存地址独占访问,并且在实现上使用特殊的汇编指令访问。请看参考资料中对于自旋锁的实现分析。以 arm 为例,从存在 SMP 的 ARM 构架指令集开始(V6、V7),采用 LDREX 和 STREX 指令实现真正的自旋等待。

根据上的介绍,我们很容易知道自旋锁的组成:

  • 中断控制(仅在中断代码可能访问临界区时需要)
  • 抢占控制(仅存在于可抢占内核中需要)
  • 自旋锁标志控制 (仅 SMP 系统需要)

中断控制是按代码访问临界区的不同而在编程时选用不同的变体,有些 API 中有,有些没有。

而抢占控制和自旋锁标志控制依据内核配置(是否支持内核抢占)和硬件平台(是否为 SMP)的不同而在编译时确定。如果不需要,相应的控制代码就编译为空函数。 对于非抢占式内核,由自旋锁所保护的每个临界区都有禁止内核抢占的 API,但是为空操作。由于 UP 系统不存在物理上的并行,所以可以阉割掉自旋的部分,剩下抢占和中断操作部分即可。

  1. 到这里其实就可以解释为什么我开始的实验现象和预想的完全不同了:
  2. 由于 UP 系统(在不配置 CONFIG_DEBUG_SPINLOCK 的情况下),根本就没有自旋锁控制的部分,多次获得自旋锁是可能的(这种编程本来就是错误的,只是我想看错误的现象而已)。

对于其中的一点疑惑:

  • 在有禁用中断的版本中,既然已经禁用了中断,在本处理器上就不会被打断,禁用抢占是否多余?
  • 禁用了中断可以避免因为中断引起的抢占调度,但是如果在自旋锁保护的临界区中存在 preempt_disable();和 preempt_enable();对。这样在 preempt_enable();就会引发抢占调度。
  • 避免 SMP 系统中别的处理器执行调度程序使得本处理器的进程会被调度出去。?????

几种实现

传统的 spinlock

原理: 当某个处理器上的内核执行线程申请自旋锁时,如果锁可用,则获得锁,然后执行临界区操作,最后释放锁;如果锁已被占用,线程并不会转入睡眠状态,而是忙等待该锁,一旦锁被释放,则第一个感知此信息的线程将获得锁。

实现: raw_spinlock_t 数据结构

1
2
3
typedef struct {
    unsigned int slock;
} raw_spinlock_t;

传统的自旋锁本质上用一个整数来表示,值为 1 代表锁未被占用, 为 0 或者为负数表示被占用。

在单处理机环境中可以使用特定的原子级汇编指令 swap 和 test_and_set 实现进程互斥,(Swap 指令:交换两个内存单元的内容;test_and_set 指令取出内存某一单元(位)的值,然后再给该单元(位)赋一个新值) 这些指令涉及对同一存储单元的两次或两次以上操作,这些操作将在几个指令周期内完成,但由于中断只能发生在两条机器指令之间,而同一指令内的多个指令周期不可中断,从而保证 swap 指令或 test_and_set 指令的执行不会交叉进行. 在多处理机环境中情况有所不同,例如 test_and_set 指令包括“取”、“送”两个指令周期,两个 CPU 执行 test_and_set(lock)可能发生指令周期上的交叉,假如 lock 初始为 0, CPU1 和 CPU2 可能分别执行完前一个指令周期并通过检测(均为 0),然后分别执行后一个指令周期将 lock 设置为 1,结果都取回 0 作为判断临界区空闲的依据,从而不能实现互斥. 为在多 CPU 环境中利用 test_and_set 指令实现进程互斥,硬件需要提供进一步的支持,以保证 test_and_set 指令执行的原子性. 这种支持目前多以“锁总线”(bus locking)的形式提供的,由于 test_and_set 指令对内存的两次操作都需要经过总线,在执行 test_and_set 指令之前锁住总线,在执行 test_and_set 指令后开放总线,即可保证 test_and_set 指令执行的原子性。 用法如下: 多处理机互斥算法(自旋锁算法)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
do{
	b=1;
	while(b) {
		lock(bus);
		b = test_and_set(&lock);
		unlock(bus);
	}
	临界区
	lock = 0;
	其余部分
}while(1)

linux 内核中锁的实现: 加锁:

1
2
3
4
5
6
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
	__asm__ __volatile__(
		__raw_spin_lock_string
		:"=m" (lock->slock) : : "memory");
}

解锁:

1
2
3
4
5
6
static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
	__asm__ __volatile__(
		__raw_spin_unlock_string
	);
}

不足: 由于传统自旋锁无序竞争的本质特点,内核执行线程无法保证何时可以取到锁,某些执行线程可能需要等待很长时间,导致“不公平”问题的产生。 这有两方面的原因:

  1. 随着处理器个数的不断增加,自旋锁的竞争也在加剧,自然导致更长的等待时间。 释放自旋锁时的重置操作将无效化所有其它正在忙等待的处理器的缓存,那么在处理器拓扑结构中临近自旋锁拥有者的处理器可能会更快地刷新缓存,因而增大获得自旋锁的机率。
  2. 由于每个申请自旋锁的处理器均在全局变量 slock 上忙等待,系统总线将因为处理器间的缓存同步而导致繁重的流量,从而降低了系统整体的性能。

Ticket Spinlock

原理: Linux 内核 2.6.25 版本中引入了排队自旋锁:通过保存执行线程申请锁的顺序信息来解决“不公平”问题。

排队自旋锁仍然使用原有的 raw_spinlock_t 数据结构,但是赋予 slock 域新的含义。为了保存顺序信息,slock 域被分成两部分,分别保存锁持有者和未来锁申请者的票据序号(Ticket Number),如下图所示:

Ticket Number
Ticket Number

只有 Next 域与 Owner 域相等时,才表明锁处于未使用状态(此时也无人申请该锁)。排队自旋锁初始化时 slock 被置为 0,即 Owner 和 Next 置为 0。内核执行线程申请自旋锁时,原子地将 Next 域加 1,并将原值返回作为自己的票据序号。如果返回的票据序号等于申请时的 Owner 值,说明自旋锁处于未使用状态,则直接获得锁;否则,该线程忙等待检查 Owner 域是否等于自己持有的票据序号,一旦相等,则表明锁轮到自己获取。线程释放锁时,原子地将 Owner 域加 1 即可,下一个线程将会发现这一变化,从忙等待状态中退出。线程将严格地按照申请顺序依次获取排队自旋锁,从而完全解决了“不公平”问题。

实现: 排队自旋锁没有改变原有自旋锁的调用接口。 ticket spinlock 数据结构

1
2
3
4
5
6
7
8
typedef struct arch_spinlock {
	union {
		__ticketpair_t head_tail;
		struct __raw_tickets {
			__ticket_t head, tail;
		} tickets;
	};
} arch_spinlock_t;

申请自旋锁时,原子地将 tail 加 1,释放时,head 加 1。只有 head 域和 tail 域的值相等时,才表明锁处于未使用的状态。

加锁:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
    asm volatile("\n1:\t"
             LOCK_PREFIX " ; decb %0\n\t"
             "jns 3f\n"
             "2:\t"
             "rep;nop\n\t"
             "cmpb $0,%0\n\t"
             "jle 2b\n\t"
             "jmp 1b\n"
             "3:\n\t"
             : "+m" (lock->slock) : : "memory");
}

解锁:

1
2
3
4
static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
    asm volatile("movb $1,%0" : "+m" (lock->slock) :: "memory");
}

不足: 在大规模多处理器系统和 NUMA 系统中,排队自旋锁(包括传统自旋锁)存在一个比较严重的性能问题:由于执行线程均在同一个共享变量 slock 上自旋,申请和释放锁的时候必须对 slock 进行修改,这将导致所有参与排队自旋锁操作的处理器的缓存变得无效。如果排队自旋锁竞争比较激烈的话,频繁的缓存同步操作会导致繁重的系统总线和内存的流量,从而大大降低了系统整体的性能。

MCS Spinlock

原理: 核心思想是:每个锁的申请者(处理器)只在一个本地变量上自旋。MCS Spinlock 是其中一种基于链表结构的自旋锁。

MCS Spinlock 的设计目标如下: 保证自旋锁申请者以先进先出的顺序获取锁(FIFO Ordering)。 只在本地可访问的标志变量上自旋。 在处理器个数较少的系统中或锁竞争并不激烈的情况下,保持较高性能。 自旋锁的空间复杂度(即锁数据结构和锁操作所需的空间开销)为常数。 在没有处理器缓存一致性协议保证的系统中也能很好地工作。

MCS Spinlock 采用链表结构将全体锁申请者的信息串成一个单向链表,如图 1 所示。每个锁申请者必须提前分配一个本地结构 mcs_lock_node,其中至少包括 2 个域:本地自旋变量 waiting 和指向下一个申请者 mcs_lock_node 结构的指针变量 next。waiting 初始值为 1,申请者自旋等待其直接前驱释放锁;为 0 时结束自旋。而自旋锁数据结构 mcs_lock 是一个永远指向最后一个申请者 mcs_lock_node 结构的指针,当且仅当锁处于未使用(无任何申请者)状态时为 NULL 值。MCS Spinlock 依赖原子的“交换”(swap)和“比较-交换”(compare_and_swap)操作,缺乏后者的话,MCS Spinlock 就不能保证以先进先出的顺序获取锁,从而可能造成“饥饿”(Starvation)。

图 1. MCS Spinlock 示意图

在这里插入图片描述
在这里插入图片描述
实现: MCS spinlock 接口有 2 个版本, 版本 1:每个锁有 NR_CPUS 大的 node 数组, mcs_lock_node 结构可以在处理器所处节点的内存中分配,从而加快访问速度.

typedef struct _mcs_lock_node {
    volatile int waiting;
    struct _mcs_lock_node *volatile next;
} ____cacheline_aligned_in_smp mcs_lock_node;

typedef mcs_lock_node *volatile mcs_lock;

typedef struct {
    mcs_lock slock;
    mcs_lock_node nodes[NR_CPUS];
} raw_spinlock_t;
1234567891011

spin_lock(&lock) spin_unlock(&lock) 版本 2: spin_lock(&lock, &node); spin_unlock(&lock, &node);

加锁:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static __always_inline void __raw_spin_lock(raw_spinlock_t *lock)
{
    int cpu;
    mcs_lock_node *me;
    mcs_lock_node *tmp;
    mcs_lock_node *pre;

    cpu = raw_smp_processor_id();                                 (a)
    me = &(lock->nodes[cpu]);
    tmp = me;
    me->next = NULL;

    pre = xchg(&lock->slock, tmp);                              (b)
    if (pre == NULL) {
        /* mcs_lock is free */
        return;                                                 (c)
    }

    me->waiting = 1;                                               (d)
    smp_wmb();                                                      (e)
    pre->next = me;                                                (f)

    while (me->waiting) {                                         (g)
        asm volatile (pause);
    }
}

try_lock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
static __always_inline int __raw_spin_trylock(raw_spinlock_t *lock)
{
    int cpu;
    mcs_lock_node *me;

    cpu = raw_smp_processor_id();
    me = &(lock->nodes[cpu]);
    me->next = NULL;

    if (cmpxchg(&lock->slock, NULL, me) == NULL)             (a)
        return 1;
    else
        return 0;
}

解锁:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
static __always_inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
    int cpu;
    mcs_lock_node *me;
    mcs_lock_node *tmp;

    cpu = raw_smp_processor_id();
    me = &(lock->nodes[cpu]);
    tmp = me;

    if (me->next == NULL) {                                      (a)
        if (cmpxchg(&lock->slock, tmp, NULL) == me) {   (b)
            /* mcs_lock I am the last. */
            return;
        }
        while (me->next == NULL)                            (c)
            continue;
    }

    /* mcs_lock pass to next. */
    me->next->waiting = 0;                                       (d)
}

不足: 版本 1 的 mcs spinlock 锁占用空间大 版本二的 mcs spinlock 使用时需要传入 mode, 和之前的 spinlock api 不兼容,无法替换 ticket spinlock.

Qspinlock

原理: qspinlock 是内核 4.2 引入的,主要基于 mcs spinlock 的设计思想,解决了 mcs spinlock 接口不一致或空间太大的问题。 它的数据结构体比 mcs lock 大大减小, 同 ticket spinlock 一样大小。qspinlock 的等待变量是全局变量。

实现: 数据结构:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
qspinlock的数据结构定义在kernel/qspinlock.c
struct __qspinlock {
	union {
		atomic_t val;
#ifdef __LITTLE_ENDIAN
		struct {
			u8	locked;
			u8	pending;
		};
		struct {
			u16	locked_pending;
			u16	tail;
		};
#else
		struct {
			u16	tail;
			u16	locked_pending;
		};
		struct {
			u8	reserved[2];
			u8	pending;
			u8	locked;
		};
#endif

可以看到 qspinlock 就是一个原子变量,但是在实际使用中却将这个原子变量分成很多位域 具体位域如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
/*
 * Bitfields in the atomic value:
 *
 * When NR_CPUS < 16K
 *  0- 7: locked byte
 *     8: pending
 *  9-15: not used
 * 16-17: tail index
 * 18-31: tail cpu (+1)
 *
 * When NR_CPUS >= 16K
 *  0- 7: locked byte
 *     8: pending
 *  9-10: tail index
 * 11-31: tail cpu (+1)
 */

queued_spin_lock:

1
2
3
4
5
6
7
8
9
static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
        u32 val;

        val = atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL);
        if (likely(val == 0))
                return;
        queued_spin_lock_slowpath(lock, val);
}

qspinlock 采用 mcs lock 的机制, 每一个 cpu 都定义有一个 strcut mcs spinlock 的数据结构 在大规模多处理器系统和 NUMA 系统中, 使用 qspinlock 可以较好的提高锁的性能。

性能比较

写一个 spinlock 的性能测试驱动,在等待相同时间后比较 spinlock 临界区域的值, 从而比较各个锁的性能差异。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#include <linux/init.h>
#include <linux/module.h>
#include <linux/kthread.h>
#include <linux/sched.h>
#include <linux/kernel/h>
#include <linux/spinlock.h>
#include <linux/random.h>
#include <linux/slab.h>
#incude <linux/timer.h>
#include <linux/jiffies.h>
#include <linux/atomic.h>

int spinlock_num;

struct worker {
	int burns;
	struct task_struct *task;
}
static struct worker *workers;
static int threads = 2;
module_param(threads, int, 0);

static spinlock_t lock;
static int runtime = 10;
module_param(runtime, int, 0);

static int bench_running;
static task_struct *monitor_task;

static int rerun, done;
module_param(rerun, int, S_IRUGO|S_ISUSR);
module_param(done, int, S_IRUGO|S_ISUSR);

static int work(void *data)
{
	struct worker *wk = (struct worker*)arg;
	while(!kthread_should_stop()) {
		cond_resched();

	if (!ACCESS_ONCE(bench_running))
		continue;
	spin_lock(&lock)
	spinlock_num++;
	spin_unlock(&lock);
	}
	return 0;
}

static int monitor(void *unused)
{
	int i, c;
	int total, min, max, avg;

repeat:
	total = 0, min = INT_MAX, max = 0, avg = 0;

	spinlock_num = 0;

	workers = (struct worker *)kzalloc(sizeof(struct worker) * threads, GFP_KERNEL);
	for (i = 0; i < threads; i++) {
		c = i %num_online_cpus();
		workers[i].task = kthread_create(work, &workers, "locktest/%d:%d", c, i);
		kthread_bind(workers[i].task, c);
		wake_up_process(workers[i].task);
	}
	bench_running = 0;
	for (i = 0; i < threads; i++) {
		if (workers[i].task)
			kthread_stop(workers[i].task);
	}
	kfree(workers);
	printk("lockresult:%6d %8d %12d\n", num_online_cpus(), threads, spinlock_num);
	done = 1;
	while(!kthread_should_stop()) {
		schedule_timeout(1);
		if (cmpxchg(&rerun, done, 0)) {
			done = 0;
			goto repeat;
		}
	}
	return 0;
}

static int locktest_init(void)
{
	monitor_task = kthread_run(monitor, NULL, "monitor");
	return 0;
}

static void locktest_exit(void)
{
	kthread_stop(monitor_task);
}

module_init(locktest_init);
module_exit(locktest_exit);
MODULE_LICENSE("GPL");

从测试结果来看, 在 cpu 较少的情况下, qspinlock 的性能和 ticket spinlock 的性能差不多, 在 CPU 较多的情况下,qspinlock 的性能远好于 ticket spinlock.

参考资料