cas 无锁编程
引言
最近面试 经常问到多线程的问题,如多个线程操作一个数据,如何保证数据同步一致,如果多线程加锁后,如何避免加锁?传统的锁机制虽然能解决大部分同步问题,但也有很多问题?如死锁,性能下降;所以最后问题导向都会朝着无锁问,无锁也有很多种无锁算法,本文就主要介绍 CAS 无锁算法,CAS不仅避免了传统锁的许多问题,还提高了系统的整体性能。本文将详细介绍CAS的基本原理、C++中的实现、应用场景、注意事项以及总结。
CAS的基本原理
什么是CAS?
CAS是一种原子操作,通常由硬件直接支持。它涉及三个参数:
- \\内存位置 (V)\\:需要进行操作的内存地址。
- \\预期值 (A)\\:期望内存位置V当前存储的值。
- \\新值 (B)\\:如果内存位置V的值等于预期值A,则将V的值设置为新值B。
CAS的工作方式
- 比较:检查内存位置V的值是否等于预期值A。
- 交换:如果相等,则将V的值设置为新值B。
- 返回结果:返回操作是否成功的标志。
以上的三个步骤都是一个整体,都是原子性的,不会被其他线程中断。
C++中的实现 && 和 mutex 差距
10个线程 对一个整数做累加
互斥锁
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <chrono>
int counter = 0;
std::mutex mtx;
void increment() {
for (int i = 0; i < 10000000; ++i) {
// 锁定互斥锁
std::lock_guard<std::mutex> lock(mtx);
// 增加计数器
++counter;
}
}
int main() {
const int num_threads = 10;
std::vector<std::thread> threads;
// 记录开始时间
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment);
}
for (auto& thread : threads) {
thread.join();
}
// 记录结束时间
auto end = std::chrono::high_resolution_clock::now();
// 计算总时间
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
std::cout << "Final counter value: " << counter << std::endl;
std::cout << "Time taken: " << duration << " milliseconds" << std::endl;
return 0;
}
//结果
//Final counter value: 100000000
//Time taken: 6941 milliseconds
CAS
在C++11及更高版本中, std::atomic 类模板提供了对CAS的支持
在这个例子中, compare_exchange_weak 函数尝试将 counter 从 expected 更新为 expected + 1。如果 counter 的当前值不是 expected,则 compare_exchange_weak 会失败,并将 counter 的实际值写回到 expected 中,循环继续直到成功为止。
其实这种方式 就是类似于自旋锁
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
#include <chrono>
std::atomic<int> counter(0);
void increment() {
for (int i = 0; i < 10000000; ++i) {
int expected;
//这总累加场景 ,建议用 atomic_fetch_add 来实现会更快
do {
//先读取 count 值
expected = counter.load();
//用count 的值 和 expected,如果还是一样的,expected +1 ,不是返回false,继续重试
// 多线程场景中,这种累加方式 重试的次数会很多, 会有大量计算资源浪费到轮询本身上;
} while (!counter.compare_exchange_weak(expected, expected + 1));
}
}
int main() {
const int num_threads = 10;
std::vector<std::thread> threads;
// 记录开始时间
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment);
}
for (auto& thread : threads) {
thread.join();
}
// 记录结束时间
auto end = std::chrono::high_resolution_clock::now();
// 计算总时间
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
std::cout << "Final counter value: " << counter.load() << std::endl;
std::cout << "Time taken: " << duration << " milliseconds" << std::endl;
return 0;
}
// compare_exchange_weak cas方式
//hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas
//Final counter value: 100000000
//Time taken: 16143 milliseconds
//hrp@ubuntu:~$
//atomic_fetch_add 实现
//hrp@ubuntu:~$ ./cas
//Final counter value: 100000000
//Time taken: 1491 milliseconds
耗时统计
mutex
//Final counter value: 100000000
//Time taken: 6941 milliseconds
compare_exchange_weak cas方式
//hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas
//Final counter value: 100000000
//Time taken: 16143 milliseconds
//hrp@ubuntu:~$
atomic_fetch_add 实现
//hrp@ubuntu:~$ ./cas
//Final counter value: 100000000
//Time taken: 1491 milliseconds
可以看到 cas 耗时更多,为什么呢?
cas 遇到竞争的解决方式是不是等待,而是不断地重是
do {
//先读取 count 值
expected = counter.load();
//用count 的值 和 expected,如果还是一样的,expected +1 ,不是返回false,继续重试
// 多线程场景中,这种累加方式 重试的次数会很多
} while (!counter.compare_exchange_weak(expected, expected + 1));
// 用 每次读取期待值后,打印count ,计算count 打印次数,
for (int i = 0; i < 10000000; ++i) {
int expected = 0;
do {
expected = counter.load();
std::cout << "count" << std::endl;
} while (!counter.compare_exchange_weak(expected, expected + 1));
}
//可以看到 重试了 3612149次
hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas |wc -l
103612149
hrp@ubuntu:~$
系统调用次数,cas方式,上下文切换比较少,耗时都是在用户态
hrp@ubuntu:~$ sudo perf stat -e context-switches ./cas
Final counter value: 100000000
Time taken: 16679 milliseconds
Performance counter stats for './cas':
3,413 context-switches
16.681459973 seconds time elapsed
131.787816000 seconds user
0.027979000 seconds sys
hrp@ubuntu:~$ sudo perf stat -e context-switches ./mutex
Final counter value: 100000000
Time taken: 7010 milliseconds
Performance counter stats for './mutex':
739,273 context-switches
7.012276162 seconds time elapsed
11.464533000 seconds user
15.450764000 seconds sys
atomic 中的两个接口
atomic 库,提供了两个接口 用来实现 CAS
- \\
compare_exchange_weak\\: - 可能因为硬件中断或其他原因失败,即使预期值与当前值匹配。
- 通常更高效,因为可以在某些架构上生成更轻量级的指令。
- 需要在循环中使用,因为可能需要多次尝试。
- \\
compare_exchange_strong\\: - 只有当预期值与当前值不匹配时才会失败。
- 通常生成更复杂的指令,以确保强保证。
- 无需在循环中使用,但可能在高竞争环境下性能较差。
两者耗时比较(差距很小)
#compare_exchange_strong
hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas
Final counter value: 100000000
Time taken: 17137 milliseconds
#compare_exchange_weak
hrp@ubuntu:~$ vi cas1.cpp
hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas
Final counter value: 100000000
Time taken: 17074 milliseconds
注意事项
ABA问题
ABA问题是CAS的一个常见问题。当一个值从A变为B再变回A时,CAS可能会错误地认为值没有改变过。解决这个问题的一种方法是使用带有版本号的CAS,或者使用 std::atomic_flag 等其他机制。
#include <atomic>
struct ABANode {
int value;
std::atomic<int> version;
ABANode(int val) : value(val), version(0) {}
};
bool cas_with_version(ABANode* node, int expected_value, int new_value, int expected_version) {
if (node->value == expected_value && node->version.load() == expected_version) {
node->value = new_value;
node->version.fetch_add(1);
return true;
}
return false;
}