C++11 并发教程第二部分:保护共享数据

红薯 发布于 2012/03/27 11:44
阅读 3K+
收藏 18

在上一篇文章“C++11 并发 —— 第一部分:启动线程”中我们介绍了如何在C++11中编写多线程程序,这些在线程中执行的代码都是独立的,但在实际应用中,我们经常会需要线程去访问一些共享的数据,因此也就面临了共享数据的同步问题。

我们先通过一个简单的代码来了解该问题。

同步问题

我们使用一个简单的结构体 Counter,该结构体包含一个值以及一个方法用来改变这个值:

struct Counter {
    int value;
 
    void increment(){
        ++value;
    }
};

然后启动多个线程来修改结构体的值:

int main(){
    Counter counter;
 
    std::vector<std::thread> threads;
    for(int i = 0; i < 5; ++i){
        threads.push_back(std::thread([&counter](){
            for(int i = 0; i < 100; ++i){
                counter.increment();
            }
        }));
    }
 
    for(auto& thread : threads){
        thread.join();
    }
 
    std::cout << counter.value << std::endl;
 
    return 0;
}

我们启动了5个线程来增加计数器的值,每个线程增加了100次,然后在线程结束时打印计数器的值。

但我们运行这个程序的时候,我们是希望它会答应500,但事实不是如此,没人能确切知道程序将打印什么结果,下面是在我机器上运行后打印的数据,而且每次都不同:

442
500
477
400
422
487

问题的原因在于改变计数器值并不是一个原子操作,需要经过下面三个操作才能完成一次计数器的增加:

  • 首先读取 value 的值
  • 然后将 value 值加1
  • 将新的值赋值给 value

但你使用单线程来运行这个程序的时候当然没有任何问题,因此程序是顺序执行的,但在多线程环境中就有麻烦了,想象下下面这个执行顺序:

  1. Thread 1 : 读取 value, 得到 0, 加 1, 因此 value = 1
  2. Thread 2 : 读取 value, 得到 0, 加 1, 因此 value = 1
  3. Thread 1 : 将 1 赋值给 value,然后返回 1
  4. Thread 2 : 将 1 赋值给 value,然后返回 1

这种情况我们称之为多线程的交错执行,也就是说多线程可能在同一个时间点执行相同的语句,尽管只有两个线程,交错的现象也很明显。如果你有更多的线程、更多的操作需要执行,那么这个交错是必然发生的。

有很多方法来解决线程交错的问题:

  • 信号量 Semaphores
  • 原子引用 Atomic references
  • Monitors
  • Condition codes
  • Compare and swap

在这篇文章中我们将学习如何使用信号量来解决这个问题。信号量也有很多人称之为互斥量(Mutex),同一个时间只允许一个线程获取一个互斥对象的锁,通过 Mutex 的简单属性就可以用来解决交错的问题。

使用 Mutex 让计数器程序是线程安全的

在 C++11 线程库中,互斥量包含在 mutex 头文件中,对应的类是 std::mutex,有两个重要的方法 mutex:lock() 和 unlock() ,从名字上可得知是用来锁对象以及释放锁对象。一旦某个互斥量被锁,那么再次调用 lock() 返回堵塞值得该对象被释放。

为了让我们刚才的计数器结构体是线程安全的,我们添加一个 set:mutext 成员,并在每个方法中通过 lock()/unlock() 方法来进行保护:

struct Counter {
    std::mutex mutex;
    int value;
 
    Counter() : value(0) {}
 
    void increment(){
        mutex.lock();
        ++value;
        mutex.unlock();
    }
};

然后我们再次测试这个程序,打印的结果就是 500 了,而且每次都一样。

异常和锁

现在让我们来看另外一种情况,想象我们的的计数器有一个减操作,并在值为0的时候抛出异常:

struct Counter {
    int value;
 
    Counter() : value(0) {}
 
    void increment(){
        ++value;
    }
 
    void decrement(){
        if(value == 0){
            throw "Value cannot be less than 0";
        }
 
        --value;
    }
};

然后我们不需要修改类来访问这个结构体,我们创建一个封装器:

struct ConcurrentCounter {
    std::mutex mutex;
    Counter counter;
 
    void increment(){
        mutex.lock();
        counter.increment();
        mutex.unlock();
    }
 
    void decrement(){
        mutex.lock();
        counter.decrement();        
        mutex.unlock();
    }
};

大部分时候该封装器运行挺好,但是使用 decrement 方法的时候就会有异常发生。这是一个大问题,一旦异常发生后,unlock 方法就没被调用,导致互斥量一直被占用,然后整个程序就一直处于堵塞状态(死锁),为了解决这个问题我们需要用 try/catch 结构来处理异常情况:

void decrement(){
    mutex.lock();
    try {
        counter.decrement();
    } catch (std::string e){
        mutex.unlock();
        throw e;
    } 
    mutex.unlock();
}

这个代码并不难,但看起来很丑,如果你一个函数有 10 个退出点,你就必须为每个退出点调用一次 unlock 方法,或许你可能在某个地方忘掉了 unlock ,那么各种悲剧即将发生,悲剧发生将直接导致程序死锁。

接下来我们看如何解决这个问题。

自动锁管理

当你需要包含整段的代码(在我们这里是一个方法,也可能是一个循环体或者其他的控制结构),有这么一种好的解决方法可以避免忘记释放锁,那就是 std::lock_guard.

这个类是一个简单的智能锁管理器,但创建 std::lock_guard 时,会自动调用互斥量对象的 lock() 方法,当 lock_guard 析构时会自动释放锁,请看下面代码:

struct ConcurrentSafeCounter {
    std::mutex mutex;
    Counter counter;
 
    void increment(){
        std::lock_guard<std::mutex> guard(mutex);
        counter.increment();
    }
 
    void decrement(){
        std::lock_guard<std::mutex> guar(mutex);
        mutex.unlock();
    }
};

是不是看起来爽多了?

使用 lock_guard ,你不再需要考虑什么时候要释放锁,这个工作已经由 std::lock_guard 实例帮你完成。

结论

在这篇文章中我们学习了如何通过信号量/互斥量来保护共享数据。需要记住的是,使用锁会降低程序性能。在一些高并发的应用环境中有其他更好的解决办法,不过这不在本文的讨论范畴之内。

你可以在 Github 上获取本文的源码.

英文原文,OSCHINA原创翻译

加载中
0
mahone
mahone
不错。多谢分享。。。
0
R
Ralph
一个问题:C++标准会保证局部变量的析构函数在函数退出前调用吗?
0
心有思
心有思

函数退出会弹出该段程序占用的栈空间 (SP),如果你的局部变量是栈上的(不是new/malloc等申请的),c++会保证它在函数退出的时候回收。

0
diguo2046
diguo2046
受教了, 正在学习C11
0
cuialei
cuialei
不错,支持持续更新!
0
cuialei
cuialei
01 struct ConcurrentSafeCounter {
02     std::mutex mutex;
03     Counter counter;
04   
05     void increment(){
06         std::lock_guard<std::mutex> guard(mutex);
07         counter.increment();
08     }
09   
10     void decrement(){
11         std::lock_guard<std::mutex> guar(mutex);
12         counter.decrement();//最后一段代码是不是应该这样呢?可能是楼主笔误了
13     }
14 };
返回顶部
顶部