reader-writer control
读者写者-处理的多种方式
第一种:最省事的互斥锁
使用C++11的thread和``mutex`库的互斥量和锁和多线程来实现。
#include <iostream>
#include <thread>
#include <mutex>
int val = 0;
std::mutex mutex;
void read() {
std::cout << "I' m reading val: " << val << std::endl;
}
void write() {
++val;
std::cout << "I' m writing val: " << val << std::endl;
}
void reader() {
while (true) {
std::unique_lock<std::mutex> lock(mutex);
read();
}
}
void writer() {
while (true) {
std::unique_lock<std::mutex> lock(mutex);
write();
}
}
int main() {
std::thread read_thread(&reader);
std::thread write_thread(&writer);
read_thread.join();
write_thread.join();
return 0;
}
使用互斥锁有一个很明显的问题,虽然读写是互斥,但也把读读也给互斥了。因此,这点可以改进一下,于是就有了读写锁。
第二种:读写锁
使用C++17的thread和``shared_mutex`库中的读写锁和多线程来实现。
#include <iostream>
#include <thread>
#include <shared_mutex>
#include <condition_variable>
int val = 0;
std::shared_mutex mutex;
void read() {
std::cout << "I' m reading val: " << val << std::endl;
}
void write() {
++val;
std::cout << "I' m writing val: " << val << std::endl;
}
void reader() {
while (true) {
std::shared_lock<std::shared_mutex> reader_lock(mutex);
read();
}
}
void writer() {
while (true) {
std::unique_lock<std::shared_mutex> writer_lock(mutex);
write();
}
}
int main() {
std::thread read_thread(&reader);
std::thread write_thread1(&writer);
std::thread write_thread2(&writer);
read_thread.join();
write_thread1.join();
write_thread2.join();
return 0;
}写到这里,多线程读的问题总是解决了。
欸,那我要是想让读有更高的优先级呢?
PV信号量
信号量是操作系统提供的一种协调共享资源访问的方法。
通常信号量表示资源的数量,对应的变量是一个整型(sem)变量。
另外,还有两个原子操作的系统调用函数来控制信号量的,分别是:
- P 操作:将
sem减1,相减后,如果sem < 0,则进程/线程进入阻塞等待,否则继续,表明 P 操作可能会阻塞; - V 操作:将
sem加1,相加后,如果sem <= 0,唤醒一个等待中的进程/线程,表明 V 操作不会阻塞;
操作系统是如何实现 PV 操作的呢?
信号量数据结构与 PV 操作的算法描述如下图:
作者采用posix标准的semaphore和pthread库来实现以下锁和同步,由于是posix标准,Windows上做了相应的移植库,因此Unix(Linux)和Windows系统都可以运行。
而posix标准中与PV、信号量的对应关系。
- 信号量:
sem_t, 注意使用sem_init来初始化。 - P操作:
sem_wait - V操作:
sem_post
实现锁
根据上面的叙述,要使得P操作互斥,我们需要设置信号量的初值为1.
对于两个并发线程,互斥信号量的值仅取 1、0 和 -1 三个值,分别表示:
- 如果互斥信号量为 1,表示没有线程进入临界区;
- 如果互斥信号量为 0,表示有一个线程进入临界区;
- 如果互斥信号量为 -1,表示一个线程进入临界区,另一个线程等待进入。
通过互斥信号量的方式,就能保证临界区任何时刻只有一个线程在执行,就达到了互斥的效果。
读优先
#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>
sem_t data_mutex;
sem_t reader_count_mutex;
int reader_count = 0;
void* writer(void* arg) {
while (1) {
sem_wait(&data_mutex);
//operate writing
printf("I' m writer\n");
sem_post(&data_mutex);
}
}
// reader is prior.
void* reader(void* arg) {
while (1) {
sem_wait(&reader_count_mutex);
if (reader_count == 0) {
sem_wait(&data_mutex);
}
++reader_count;
sem_post(&reader_count_mutex); //unlock to let later reader in.
//operate reading
printf("I'm reader\n");
sem_wait(&reader_count_mutex); //lock to decrement reader_clount.
--reader_count;
if (reader_count == 0) {
sem_post(&data_mutex);
}
sem_post(&reader_count_mutex);
}
}
int main() {
sem_init(&data_mutex, 0, 1);
sem_init(&reader_count_mutex, 0, 1);
pthread_t reader_t1, reader_t2, writer_t;
pthread_create(&reader_t1, NULL, reader, NULL);
pthread_create(&reader_t2, NULL, reader, NULL);
pthread_create(&writer_t, NULL, writer, NULL);
pthread_join(reader_t1, NULL);
pthread_join(reader_t2, NULL);
pthread_join(writer_t, NULL);
return 0;
}只要有读者正在读的状态,后来的读者都可以直接进入,如果读者持续不断进入,则写者会处于饥饿状态。
那我要是想要写优先呢?
写优先
#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>
int reader_count = 0;
sem_t reader_count_mutex;//make counter exclusive.
int writer_count = 0;
sem_t writer_count_mutex;//make counter exclusive.
sem_t read_mutex; //make read shared.
sem_t write_mutex; //make write exclusive.
//writer prior
void* writer(void* arg) {
while (1) {
sem_wait(&writer_count_mutex);
if (writer_count == 0) {
sem_wait(&read_mutex);
}
++writer_count;
sem_post(&writer_count_mutex);
sem_wait(&write_mutex);
//operating write;
printf("I' m writing\n") ;
sem_post(&write_mutex);
sem_wait(&writer_count_mutex);
--writer_count;
if (writer_count == 0) {
sem_post(&read_mutex);
}
sem_post(&writer_count_mutex);
}
}
void* reader(void* arg) {
while (1) {
sem_wait(&read_mutex);
sem_wait(&reader_count_mutex);
if (reader_count == 0) {
sem_wait(&write_mutex);
}
++reader_count;
sem_post(&reader_count_mutex);
sem_post(&read_mutex);
//opeating read
printf("I'm reading\n");
sem_wait(&reader_count_mutex);
--reader_count;
if (reader_count == 0) {
sem_post(&write_mutex);
}
sem_post(&reader_count_mutex);
}
}
int main() {
sem_init(&writer_count_mutex, 0, 1);
sem_init(&reader_count_mutex, 0, 1);
sem_init(&write_mutex, 0, 1);
sem_init(&read_mutex, 0, 1);
pthread_t writer1_t, writer2_t, reader_t;
pthread_create(&writer1_t, NULL, writer, NULL);
pthread_create(&writer2_t, NULL, writer, NULL);
pthread_create(&reader_t, NULL,reader, NULL);
pthread_join(writer1_t, NULL);
pthread_join(writer2_t, NULL);
pthread_join(reader_t, NULL);
return 0;
}注意,这里 read_mutex 的作用,开始有多个读者读数据,它们全部进入读者队列,此时来了一个写者,执行了 P(rMutex) 之后,后续的读者由于阻塞在 rMutex 上,都不能再进入读者队列,而写者到来,则可以全部进入写者队列,因此保证了写者优先。
同时,第一个写者执行了 P(read_mutex) 之后,也不能马上开始写,必须等到所有进入读者队列的读者都执行完读操作,通过 V(write_mutex) 唤醒写者的写操作。
那想实现读写按先后顺序呢?
时间顺序
#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>
sem_t data_mutex;
int reader_count = 0;
sem_t reader_count_mutex;
sem_t higher_mutex;
void* writer(void* arg) {
while (1) {
sem_wait(&higher_mutex);
sem_wait(&data_mutex);
//operate writing
printf("I' m writing\n");
sem_post(&data_mutex);
sem_post(&higher_mutex);
}
}
// reader is prior.
void* reader(void* arg) {
while (1) {
sem_wait(&higher_mutex);
sem_wait(&reader_count_mutex);
if (reader_count == 0) {
sem_wait(&data_mutex);
}
++reader_count;
sem_post(&reader_count_mutex); //unlock to let later reader in.
sem_post(&higher_mutex);
//operate reading
printf("I'm reading\n");
sem_wait(&reader_count_mutex); //lock to decrement reader_clount.
--reader_count;
if (reader_count == 0) {
sem_post(&data_mutex);
}
sem_post(&reader_count_mutex);
}
}
int main() {
sem_init(&data_mutex, 0, 1);
sem_init(&reader_count_mutex, 0, 1);
sem_init(&higher_mutex, 0, 1);
pthread_t reader_t1, reader_t2, writer_t;
pthread_create(&reader_t1, NULL, reader, NULL);
pthread_create(&reader_t2, NULL, reader, NULL);
pthread_create(&writer_t, NULL, writer, NULL);
pthread_join(reader_t1, NULL);
pthread_join(reader_t2, NULL);
pthread_join(writer_t, NULL);
return 0;
}看完代码不知你是否有这样的疑问,为什么加了一个信号量 higher_muex,就实现了公平竞争?
对比方案一的读者优先策略,可以发现,读者优先中只要后续有读者到达,读者就可以进入读者队列, 而写者必须等待,直到没有读者到达。
没有读者到达会导致读者队列为空,即 read_count==0,此时写者才可以进入临界区执行写操作。
而这里 higher_mutex 的作用就是阻止读者的这种特殊权限(特殊权限是只要读者到达,就可以进入读者队列)。
比如:开始来了一些读者读数据,它们全部进入读者队列,此时来了一个写者,执行 P(higher_mutex) 操作,使得后续到来的读者都阻塞在 higher_mutex 上,不能进入读者队列,这会使得读者队列逐渐为空,即 read_count 减为 0。
这个写者也不能立马开始写(因为此时读者队列不为空),会阻塞在信号量 data_mutex 上,读者队列中的读者全部读取结束后,最后一个读者进程执行 V(data_mutex),唤醒刚才的写者,写者则继续开始进行写操作。
总的来说,在读优先的情况下,写者没有一个有效的手段(锁)来限制读者的无限制增加,而此处的higher_mutex起到的就是这样的作用。
实现同步
信号量不仅可以实现临界区的互斥访问控制,还可以线程间的事件同步。
根据同步的定义,我们不难知道,一个线程等待一个条件成立,需要另一个来通知,而sem<=0唤醒会线程(即同步),所以,是通过V操作来唤醒线程。而P操作会使得等待的线程睡眠,因此,要实现同步的步骤就很明了了。
等待线程:waiting_thread
P(sem)- do some work
V(sem)
通知线程:notify_thread
V(sem)P(sem)
具体实现代码如下:
#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>
sem_t sem;
void* waiting_thread(void* arg) {
while (1) {
sem_wait(&sem);
//do something
printf("I' got it.\n");
sem_post(&sem);
}
}
// notify_thread is prior.
void* notify_thread(void* arg) {
while (1) {
sem_post(&sem); //notify waiting thread.
printf("notified");
sem_wait(&sem); //hang-up waiting thread.
}
}
int main() {
sem_init(&sem, 0, 0);
pthread_t notify_t, waiting_t;
pthread_create(¬ify_t, NULL, notify_thread, NULL);
pthread_create(&waiting_t, NULL, waiting_thread, NULL);
pthread_join(notify_t, NULL);
pthread_join(waiting_t, NULL);
return 0;
}总结
从上述过程来说,PV信号量不仅可以实现锁还可以实现同步.
posix下的锁和pthread_cond_t, C++下的锁和condition_variable的都可以用PV信号量来实现。