Administrator
发布于 2024-11-11 / 23 阅读
0
1

cas 无锁编程

cas 无锁编程

引言

最近面试 经常问到多线程的问题,如多个线程操作一个数据,如何保证数据同步一致,如果多线程加锁后,如何避免加锁?传统的锁机制虽然能解决大部分同步问题,但也有很多问题?如死锁,性能下降;所以最后问题导向都会朝着无锁问,无锁也有很多种无锁算法,本文就主要介绍 CAS 无锁算法,CAS不仅避免了传统锁的许多问题,还提高了系统的整体性能。本文将详细介绍CAS的基本原理、C++中的实现、应用场景、注意事项以及总结。

CAS的基本原理

什么是CAS?

CAS是一种原子操作,通常由硬件直接支持。它涉及三个参数:

  • \\内存位置 (V)\\:需要进行操作的内存地址。
  • \\预期值 (A)\\:期望内存位置V当前存储的值。
  • \\新值 (B)\\:如果内存位置V的值等于预期值A,则将V的值设置为新值B。

CAS的工作方式

  1. 比较:检查内存位置V的值是否等于预期值A。
  1. 交换:如果相等,则将V的值设置为新值B。
  1. 返回结果:返回操作是否成功的标志。

以上的三个步骤都是一个整体,都是原子性的,不会被其他线程中断。

C++中的实现 && 和 mutex 差距

10个线程 对一个整数做累加

互斥锁


#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <chrono>
int counter = 0;
std::mutex mtx;
void increment() {
    for (int i = 0; i < 10000000; ++i) {
        // 锁定互斥锁
        std::lock_guard<std::mutex> lock(mtx);
        // 增加计数器
        ++counter;
    }
}

int main() {
    const int num_threads = 10;
    std::vector<std::thread> threads;
    // 记录开始时间
    auto start = std::chrono::high_resolution_clock::now();

    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(increment);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    // 记录结束时间
    auto end = std::chrono::high_resolution_clock::now();

    // 计算总时间
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
    std::cout << "Final counter value: " << counter << std::endl;
    std::cout << "Time taken: " << duration << " milliseconds" << std::endl;
    return 0;
}

//结果
//Final counter value: 100000000
//Time taken: 6941 milliseconds

CAS

在C++11及更高版本中, std::atomic 类模板提供了对CAS的支持

在这个例子中, compare_exchange_weak 函数尝试将 counterexpected 更新为 expected + 1。如果 counter 的当前值不是 expected,则 compare_exchange_weak 会失败,并将 counter 的实际值写回到 expected 中,循环继续直到成功为止。

其实这种方式 就是类似于自旋锁


#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
#include <chrono>

std::atomic<int> counter(0);

void increment() {
    for (int i = 0; i < 10000000; ++i) {
        int expected;
        //这总累加场景 ,建议用 atomic_fetch_add 来实现会更快
        do {
            //先读取 count 值
            expected = counter.load();
            //用count 的值 和 expected,如果还是一样的,expected +1 ,不是返回false,继续重试
            // 多线程场景中,这种累加方式 重试的次数会很多, 会有大量计算资源浪费到轮询本身上;
        } while (!counter.compare_exchange_weak(expected, expected + 1));
    }
}

int main() {
    const int num_threads = 10;
    std::vector<std::thread> threads;

    // 记录开始时间
    auto start = std::chrono::high_resolution_clock::now();

    for (int i = 0; i < num_threads; ++i) {
        threads.emplace_back(increment);
    }

    for (auto& thread : threads) {
        thread.join();
    }

    // 记录结束时间
    auto end = std::chrono::high_resolution_clock::now();

    // 计算总时间
    auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();

    std::cout << "Final counter value: " << counter.load() << std::endl;
    std::cout << "Time taken: " << duration << " milliseconds" << std::endl;

    return 0;
}

// compare_exchange_weak cas方式
//hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas
//Final counter value: 100000000
//Time taken: 16143 milliseconds
//hrp@ubuntu:~$

//atomic_fetch_add 实现
//hrp@ubuntu:~$ ./cas
//Final counter value: 100000000
//Time taken: 1491 milliseconds

耗时统计


mutex
//Final counter value: 100000000
//Time taken: 6941 milliseconds

compare_exchange_weak cas方式
//hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas
//Final counter value: 100000000
//Time taken: 16143 milliseconds
//hrp@ubuntu:~$

atomic_fetch_add 实现
//hrp@ubuntu:~$ ./cas
//Final counter value: 100000000
//Time taken: 1491 milliseconds

可以看到 cas 耗时更多,为什么呢?

cas 遇到竞争的解决方式是不是等待,而是不断地重是


      do {
            //先读取 count 值
            expected = counter.load();
            //用count 的值 和 expected,如果还是一样的,expected +1 ,不是返回false,继续重试
            // 多线程场景中,这种累加方式 重试的次数会很多
        } while (!counter.compare_exchange_weak(expected, expected + 1));

// 用 每次读取期待值后,打印count ,计算count 打印次数,
    for (int i = 0; i < 10000000; ++i) {
        int expected = 0;
        do {
            expected = counter.load();
            std::cout << "count" << std::endl;
        } while (!counter.compare_exchange_weak(expected, expected + 1));
    }

//可以看到  重试了 3612149次
hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas  |wc -l
103612149
hrp@ubuntu:~$

系统调用次数,cas方式,上下文切换比较少,耗时都是在用户态


hrp@ubuntu:~$ sudo perf stat -e context-switches ./cas
Final counter value: 100000000
Time taken: 16679 milliseconds

 Performance counter stats for './cas':

             3,413      context-switches

      16.681459973 seconds time elapsed

     131.787816000 seconds user
       0.027979000 seconds sys

hrp@ubuntu:~$ sudo perf stat -e context-switches ./mutex
Final counter value: 100000000
Time taken: 7010 milliseconds

 Performance counter stats for './mutex':

           739,273      context-switches

       7.012276162 seconds time elapsed

      11.464533000 seconds user
      15.450764000 seconds sys

atomic 中的两个接口

atomic 库,提供了两个接口 用来实现 CAS

  • \\ compare_exchange_weak\\
  • 可能因为硬件中断或其他原因失败,即使预期值与当前值匹配。
  • 通常更高效,因为可以在某些架构上生成更轻量级的指令。
  • 需要在循环中使用,因为可能需要多次尝试。
  • \\ compare_exchange_strong\\
  • 只有当预期值与当前值不匹配时才会失败。
  • 通常生成更复杂的指令,以确保强保证。
  • 无需在循环中使用,但可能在高竞争环境下性能较差。

两者耗时比较(差距很小)


#compare_exchange_strong
hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas
Final counter value: 100000000
Time taken: 17137 milliseconds

#compare_exchange_weak
hrp@ubuntu:~$ vi cas1.cpp
hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas
Final counter value: 100000000
Time taken: 17074 milliseconds

注意事项

ABA问题

ABA问题是CAS的一个常见问题。当一个值从A变为B再变回A时,CAS可能会错误地认为值没有改变过。解决这个问题的一种方法是使用带有版本号的CAS,或者使用 std::atomic_flag 等其他机制。


#include <atomic>

struct ABANode {
    int value;
    std::atomic<int> version;
    ABANode(int val) : value(val), version(0) {}
};

bool cas_with_version(ABANode* node, int expected_value, int new_value, int expected_version) {
    if (node->value == expected_value && node->version.load() == expected_version) {
        node->value = new_value;
        node->version.fetch_add(1);
        return true;
    }
    return false;
}


评论