C++11中引入了mutex和方便优雅的lock_guard,但是有时候我们想要一个性能更高的实现方式,本文主要讲解如何使用C++11中的原子操作类atomic来巧妙地实现无锁同步。
一、概述
要开始之前,我们先来谈一谈常见的锁分类:
1.悲观锁
悲观锁总是假设最坏的情况,线程a每次去获取或更新数据的时候,都会觉得别的线程也正在修改这个数据,为了避免自己的更新操作丢失,线程a会尝试获取此数据的锁,线程a获取到之后,才能对此数据进行一些更新操作。在此期间,别的线程无法更新,只能等到线程a释放锁之后,才能进行更新。
之所以叫做悲观锁,是因为这是一种对数据的修改抱有悲观态度的并发控制方式。我们一般认为数据被并发修改的概率比较大,所以需要在修改之前先加锁。
悲观并发控制,实际上是一种“先取锁再访问”的保守策略。lock_guard其实就是悲观锁的一种实现。
2.乐观锁
乐观锁假设数据一般不会造成冲突,所以在拿数据的时候不会去加锁,但是会在更新的时候判断此期间内有没有别的线程修改过数据。
CAS机制就是对乐观锁的一种实现。CAS(Compare and Swap),即比较并交换,是一种乐观的、非阻塞的算法。线程在更新失败时,不需要挂起,因此省去了大量线程上下文切换的开销,适合线程冲突不那么严重的场景。
Atomic
std::atomic是C++11新增的原子操作类,它提供了实现CAS机制的方法,提供的方法能保证具有原子性。这些方法是不可再分的,获取这些变量的值时,永远获得修改前的值或修改后的值,不会获得修改过程中的中间数值。这些类都禁用了拷贝构造函数,原因是原子读和原子写是2个独立原子操作,无法保证2个独立的操作加在一起仍然保证原子性。
atomic提供的常见方法:
- store:原子写操作。
- load:原子读操作。
- compare_exchange_strong:传入[期望原值]和[设定值],当前值与期望值相等时,修改当前值为设定值,返回true;当前值与期望值不等时,将期望值修改为当前值,返回false。
- compare_exchange_weak:同上,但允许偶然出乎意料的返回(比如在字段值和期待值一样的时候却返回了false),比strong性能更好些,但需要循环来保证逻辑的正确性。
- exchange:交换2个数值,并保证整个过程是原子的。
atomic的CAS方法参数:
template< class T>
struct atomic<T*>
{
public:
bool compare_exchange_weak( T& expected, T desired,
std::memory_order success,
std::memory_order failure );
bool compare_exchange_weak( T& expected, T desired,
std::memory_order success,
std::memory_order failure ) volatile;
bool compare_exchange_weak( T& expected, T desired,
std::memory_order order =
std::memory_order_seq_cst );
bool compare_exchange_weak( T& expected, T desired,
std::memory_order order =
std::memory_order_seq_cst ) volatile;
bool compare_exchange_strong( T& expected, T desired,
std::memory_order success,
std::memory_order failure );
bool compare_exchange_strong( T& expected, T desired,
std::memory_order success,
std::memory_order failure ) volatile;
bool compare_exchange_strong( T& expected, T desired,
std::memory_order order =
std::memory_order_seq_cst );
bool compare_exchange_strong( T& expected, T desired,
std::memory_order order =
std::memory_order_seq_cst ) volatile;
...
};
二、实践
说了那么多,可能有些同学还不明白这到底有什么用?下边用一个实例来说明一下。
1.场景描述
在非并发条件下,实现一个栈的Push操作,通常有如下操作:
- 步骤1:新建一个节点 node* const new_node = new node(data);
- 步骤2:将该节点的next指针指向现有栈顶 new_node->next = head;
- 步骤3:更新栈顶 head = new_node
但是在并发条件下,上述无保护操作明显可能出问题,例如:
- 原栈顶为A。(此时栈状态: A->P->…,从左到右第一个值为栈顶,A->P代表a.next = P)
- 线程1准备将B压栈,线程1执行完步骤2后被强占挂起。(新建节点B,并使B.next = A,即B->A)
- 线程2得到cpu时间片并完成将C压栈的操作,即完成步骤1、2、3。(此时栈状态: C->A->…)
- 这时线程1重新获得cpu时间片,执行步骤3。导致栈状态变为(此时栈状态: B->A->…),此时线程2的操作丢失,这显然不是我们想要的结果。
2.悲观锁解决方法
我们只需要在指针赋值和更新栈顶前持有互斥锁,并在head共享变量赋值后结束锁作用域即可
node* head;
std::mutex mtx; // 全局互斥锁
...
void push(T const& data)
std::lock_guard<std::mutex> lock(mtx); //持有互斥锁
// push入栈
node* const new_node = new node(data);
new_node->next = head;
head = new_node;
}
...
实现还是很简单的,但是在极其追求性能的短执行周期场景,也有性能更好的无锁实现方法,接着往下看。
3.乐观锁解决方法
在没有先取锁的情况下,我们只要保证步骤3更新head栈顶时候,栈顶是我们在步骤2中获得顶栈顶即可。因为如果有其它线程进行操作,栈顶必然会被改变。
我们可以利用实现CAS来轻松解决这个问题:如果栈顶是我们步骤2中获取顶栈顶,则执行步骤3;否则,重新执行步骤2/3。
template<typename T>
class lock_free_stack
{
private:
struct node
{
T data;
node* next;
node(T const& data_):
data(data_)
{}
};
std::atomic<node*> head;
public:
void push(T const& data)
{
node* const new_node=new node(data);
new_node->next=head.load();
//如果head!=new_node->next,会执行new_node->next = head.load()操作并返回false;while循环后再次判断,如果此时head=new_node->next,则执行head.store(new_node)并返回true结束循环
while(!head.compare_exchange_strong(new_node->next,new_node));
}
};
三、性能对比
我们在锁之后随机模拟0-3ms的业务延迟,来对比下两者的性能区别:
/*
* 有锁与无锁性能对比测试
* g++ cas.cc -std=c++11 -pthread -o build/cas
*/
#include <atomic>
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <gtest/gtest.h>
using namespace std;
// 无锁多线程入栈
template <typename T>
class LockFreeStack
{
private:
struct node
{
T data;
node *next;
node(T const &data_) : data(data_){}
};
std::atomic<node *> head;
public:
void push(T const &data)
{
node *const new_node = new node(data);
new_node->next = head.load();
// 随机延迟0-10微秒,模拟实际业务中的线程竞争中head发生变化的情况
int num = rand() % 10;
std::this_thread::sleep_for(std::chrono::microseconds(num));
// push
//如果head!=new_node->next,会执行new_node->next = head.load()操作并返回false;while循环后再次判断,如果此时head=new_node->next,则执行head.store(new_node)并返回true结束循环
while (!head.compare_exchange_strong(new_node->next, new_node));
//std::cout << "push [" << data << "] done! " << clock()*1000.0/CLOCKS_PER_SEC << std::endl;
}
};
// 有锁多线程入栈
template <typename T>
class LockStack
{
private:
struct node
{
T data;
node *next;
node(T const &data_) : data(data_){}
};
node *head;
mutex mtx; // 全局互斥锁
public:
void push(T const &data)
{
lock_guard<mutex> lock(mtx); //持有互斥锁
// 随机延迟0-10微秒,模拟实际业务中的线程竞争导致的取锁延迟
int num = rand() % 10;
std::this_thread::sleep_for(std::chrono::microseconds(num));
// push
node *const new_node = new node(data);
new_node->next = head;
head = new_node;
//std::cout << "push [" << data << "] done# " << clock()*1000.0/CLOCKS_PER_SEC << std::endl;
}
};
LockFreeStack<string> lf;
void push_lock_free_stack(const int i){
lf.push(to_string(i));
}
LockStack<string> ls;
void push_lock_stack(const int i){
ls.push(to_string(i));
}
int main(int argc, char* argv[])
{
//LockFreeStack<string> lf;
//lf.push("a");
//LockStack<string> ls;
//ls.push("b");
// 无锁多线程入栈
thread thm[1000];
for(int i=0;i<1000;i++){
thm[i] = thread(push_lock_free_stack, i+1);
}
clock_t start_time = clock();
for(int i=0;i<1000;i++){
thm[i].join();
}
clock_t cost_time = (clock() - start_time)*1000.0/CLOCKS_PER_SEC;
cout << "push_lock_free_stack cost: " << cost_time << "ms" << endl;
// 有锁多线程入栈
thread thl[1000];
for(int i=0;i<1000;i++){
thl[i] = thread(push_lock_stack, i+1); //
}
start_time = clock();
for(int i=0;i<1000;i++){
thl[i].join();
}
cost_time = (clock() - start_time)*1000.0/CLOCKS_PER_SEC;
cout << "push_lock_stack cost: " << cost_time << "ms" << endl;
return 0;
}
运行后:
$ ./build/cas
push_lock_stack cost: 10ms
push_lock_free_stack cost: 2ms
$ ./build/cas
push_lock_stack cost: 11ms
push_lock_free_stack cost: 2ms
$ ./build/cas
push_lock_stack cost: 12ms
push_lock_free_stack cost: 2ms
$ ./build/cas
push_lock_stack cost: 10ms
push_lock_free_stack cost: 2ms
$ ./build/cas
push_lock_stack cost: 11ms
push_lock_free_stack cost: 2ms
$ ./build/cas
push_lock_stack cost: 10ms
push_lock_free_stack cost: 2ms
$ ./build/cas
push_lock_stack cost: 12ms
push_lock_free_stack cost: 2ms
可以看到在模拟实际业务中的线程竞争导致的取锁延迟或共享变量被修改的场景时,atomic自旋锁的性能要明显优于mutex,如果业务处理耗时,性能差异会更大。其关键点在于atomic在自旋前的代码都是可以并行的,自旋过程中开销也较小;而lock_guard持有锁时,其他所有线程锁作用域代码都必须等待,这会导致总体延迟的增加。
四、总结
atomic 自旋是在 cpu 指令级别上实现的乐观锁,它不需要预先取锁,除了必不可少的内存栅栏带来流水线效率损失外,几乎可以认为没什么额外的开销,性能较好;当它遇到线程竞争进行自旋时,会占用cpu资源。而 mutex 的话,光是在内核里睡眠/唤醒一下,就至少是微秒级的时间开销,更别说各种额外的调用、封装、判断了,性能相对会略慢些;当取锁失败时,线程会挂起让出cpu资源,等待其他线程释放锁时再被唤醒。
当然我们也可以看到它的不足,就是当需要同时修改多个共享变量,并且需要保证批操作整体的原子性时,使用atomic就会比较复杂,因为简单的用法里很有可能在第一个共享变量A修改成功但第二个变量B还在自旋中时,此时其他线程读取到修改后的变量A和未修改的变量B时可能会出现预期之外的情况,当然我们也可以基于它进行封装,以实现一个自己的读写锁机制。
总体来说,对于出现竞争的场景,需要根据业务逻辑的实际情况来判断,如果能控制锁只控制一个共享数据,并且对性能有较高要求,那么推荐使用atomic乐观自旋锁的方案。
yan 22.7.26
参考:
https://www.codenong.com/cs107035480/