reader-writer control

读者写者-处理的多种方式

第一种:最省事的互斥锁

使用C++11thread和``mutex`库的互斥量和锁和多线程来实现。

#include <iostream>
#include <thread>
#include <mutex>

int val = 0;
std::mutex mutex;

void read() {
    std::cout << "I' m reading val: " << val << std::endl;
}

void write() {
    ++val;
    std::cout << "I' m writing val:  " << val << std::endl;
}
void reader() {
    while (true) {
        std::unique_lock<std::mutex> lock(mutex);
        read();
    }
}

void writer() {
    while (true) {
        std::unique_lock<std::mutex> lock(mutex);
        write();
    }
}


int main() {
    std::thread read_thread(&reader);
    std::thread write_thread(&writer);
    read_thread.join();
    write_thread.join();
    return 0;
}

使用互斥锁有一个很明显的问题,虽然读写是互斥,但也把读读也给互斥了。因此,这点可以改进一下,于是就有了读写锁。

第二种:读写锁

使用C++17thread和``shared_mutex`库中的读写锁和多线程来实现。

#include <iostream>
#include <thread>
#include <shared_mutex>
#include <condition_variable>
int val = 0;
std::shared_mutex mutex;

void read() {
    std::cout << "I' m reading val: " << val << std::endl;
}

void write() {
    ++val;
    std::cout << "I' m writing val:  " << val << std::endl;
}
void reader() {
    while (true) {
        std::shared_lock<std::shared_mutex> reader_lock(mutex);
        read();
    }
}

void writer() {
    while (true) {
        std::unique_lock<std::shared_mutex> writer_lock(mutex);
        write();
    }
}


int main() {
    std::thread read_thread(&reader);
    std::thread write_thread1(&writer);
    std::thread write_thread2(&writer);
    read_thread.join();
    write_thread1.join();
    write_thread2.join();
    return 0;
}

写到这里,多线程读的问题总是解决了。

欸,那我要是想让读有更高的优先级呢?

PV信号量

信号量是操作系统提供的一种协调共享资源访问的方法。

通常信号量表示资源的数量,对应的变量是一个整型(sem)变量。

另外,还有两个原子操作的系统调用函数来控制信号量的,分别是:

  • P 操作:将 sem1,相减后,如果 sem < 0,则进程/线程进入阻塞等待,否则继续,表明 P 操作可能会阻塞;
  • V 操作:将 sem1,相加后,如果 sem <= 0,唤醒一个等待中的进程/线程,表明 V 操作不会阻塞;

操作系统是如何实现 PV 操作的呢?

信号量数据结构与 PV 操作的算法描述如下图:

作者采用posix标准的semaphorepthread库来实现以下锁和同步,由于是posix标准,Windows上做了相应的移植库,因此Unix(Linux)和Windows系统都可以运行。

posix标准中与PV、信号量的对应关系。

  • 信号量:sem_t, 注意使用sem_init来初始化。
  • P操作:sem_wait
  • V操作:sem_post

实现锁

根据上面的叙述,要使得P操作互斥,我们需要设置信号量的初值为1.

对于两个并发线程,互斥信号量的值仅取 1、0 和 -1 三个值,分别表示:

  • 如果互斥信号量为 1,表示没有线程进入临界区;
  • 如果互斥信号量为 0,表示有一个线程进入临界区;
  • 如果互斥信号量为 -1,表示一个线程进入临界区,另一个线程等待进入。

通过互斥信号量的方式,就能保证临界区任何时刻只有一个线程在执行,就达到了互斥的效果。

读优先

#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>

sem_t data_mutex;
sem_t reader_count_mutex;
int reader_count = 0;

void* writer(void* arg) {
  while (1) {
    sem_wait(&data_mutex);
    //operate writing
    printf("I' m writer\n");
    sem_post(&data_mutex);
  }
}

// reader is prior.
void* reader(void* arg) {
  while (1) {
    sem_wait(&reader_count_mutex);
    if (reader_count == 0) {
      sem_wait(&data_mutex);
    }
    ++reader_count;
    sem_post(&reader_count_mutex); //unlock to let later reader in.
    
    
    //operate reading     
    printf("I'm reader\n");    
  
    sem_wait(&reader_count_mutex); //lock to decrement reader_clount.
    --reader_count;
    if (reader_count == 0) {
      sem_post(&data_mutex);
    }
    sem_post(&reader_count_mutex);
  }
}


int main() {
  sem_init(&data_mutex, 0, 1);
  sem_init(&reader_count_mutex, 0, 1);

  pthread_t reader_t1, reader_t2, writer_t;
  pthread_create(&reader_t1, NULL, reader, NULL);
  pthread_create(&reader_t2, NULL, reader, NULL);
  pthread_create(&writer_t, NULL, writer, NULL);
  pthread_join(reader_t1, NULL);
  pthread_join(reader_t2, NULL);
  pthread_join(writer_t, NULL);
  return 0;
}

只要有读者正在读的状态,后来的读者都可以直接进入,如果读者持续不断进入,则写者会处于饥饿状态。

那我要是想要写优先呢?

写优先

#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>

int reader_count = 0;
sem_t reader_count_mutex;//make counter exclusive.

int writer_count = 0;
sem_t writer_count_mutex;//make counter exclusive.

sem_t read_mutex;  //make read shared.
sem_t write_mutex; //make write exclusive.

//writer prior
void* writer(void* arg) {
    while (1) {
        sem_wait(&writer_count_mutex);
        if (writer_count == 0) {
            sem_wait(&read_mutex);
        }
        ++writer_count;
        sem_post(&writer_count_mutex);

        sem_wait(&write_mutex);
        //operating write;  
        printf("I' m writing\n") ;
        sem_post(&write_mutex);

        sem_wait(&writer_count_mutex);
        --writer_count;
        if (writer_count == 0) {
            sem_post(&read_mutex);
        }
        sem_post(&writer_count_mutex);
    }
}


void* reader(void* arg) {
    while (1) {
        sem_wait(&read_mutex);
        sem_wait(&reader_count_mutex);
        if (reader_count == 0) {
            sem_wait(&write_mutex);
        }
        ++reader_count;
        sem_post(&reader_count_mutex);
        sem_post(&read_mutex);

        //opeating read
        printf("I'm reading\n");

        sem_wait(&reader_count_mutex);
        --reader_count;
        if (reader_count == 0) {
            sem_post(&write_mutex);
        }
        sem_post(&reader_count_mutex);
    }
}

int main() {

    sem_init(&writer_count_mutex, 0, 1);
    sem_init(&reader_count_mutex, 0, 1);

    sem_init(&write_mutex, 0, 1);
    sem_init(&read_mutex, 0, 1);

    pthread_t writer1_t, writer2_t, reader_t;
    pthread_create(&writer1_t, NULL, writer, NULL);
    pthread_create(&writer2_t, NULL, writer, NULL);
    pthread_create(&reader_t, NULL,reader, NULL);

    pthread_join(writer1_t, NULL);
    pthread_join(writer2_t, NULL);
    pthread_join(reader_t, NULL);
    return 0;
}

注意,这里 read_mutex 的作用,开始有多个读者读数据,它们全部进入读者队列,此时来了一个写者,执行了 P(rMutex) 之后,后续的读者由于阻塞在 rMutex 上,都不能再进入读者队列,而写者到来,则可以全部进入写者队列,因此保证了写者优先。

同时,第一个写者执行了 P(read_mutex) 之后,也不能马上开始写,必须等到所有进入读者队列的读者都执行完读操作,通过 V(write_mutex) 唤醒写者的写操作。

那想实现读写按先后顺序呢?

时间顺序

#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>

sem_t data_mutex;

int reader_count = 0;
sem_t reader_count_mutex;

sem_t higher_mutex;

void* writer(void* arg) {
    while (1) {
        sem_wait(&higher_mutex);
        sem_wait(&data_mutex);
        //operate writing
        printf("I' m writing\n");
        sem_post(&data_mutex);
        sem_post(&higher_mutex);
    }
}

// reader is prior.
void* reader(void* arg) {
    while (1) {
        sem_wait(&higher_mutex);
        sem_wait(&reader_count_mutex);
        if (reader_count == 0) {
            sem_wait(&data_mutex);
        }
        ++reader_count;
        sem_post(&reader_count_mutex); //unlock to let later reader in.
        sem_post(&higher_mutex);

        //operate reading
        printf("I'm reading\n");

        sem_wait(&reader_count_mutex); //lock to decrement reader_clount.
        --reader_count;
        if (reader_count == 0) {
            sem_post(&data_mutex);
        }
        sem_post(&reader_count_mutex);
    }
}


int main() {
    sem_init(&data_mutex, 0, 1);
    sem_init(&reader_count_mutex, 0, 1);
    sem_init(&higher_mutex, 0, 1);

    pthread_t reader_t1, reader_t2, writer_t;
    pthread_create(&reader_t1, NULL, reader, NULL);
    pthread_create(&reader_t2, NULL, reader, NULL);
    pthread_create(&writer_t, NULL, writer, NULL);
    pthread_join(reader_t1, NULL);
    pthread_join(reader_t2, NULL);
    pthread_join(writer_t, NULL);
    return 0;
}

看完代码不知你是否有这样的疑问,为什么加了一个信号量 higher_muex,就实现了公平竞争?

对比方案一的读者优先策略,可以发现,读者优先中只要后续有读者到达,读者就可以进入读者队列, 而写者必须等待,直到没有读者到达。

没有读者到达会导致读者队列为空,即 read_count==0,此时写者才可以进入临界区执行写操作。

而这里 higher_mutex 的作用就是阻止读者的这种特殊权限(特殊权限是只要读者到达,就可以进入读者队列)。

比如:开始来了一些读者读数据,它们全部进入读者队列,此时来了一个写者,执行 P(higher_mutex) 操作,使得后续到来的读者都阻塞在 higher_mutex 上,不能进入读者队列,这会使得读者队列逐渐为空,即 read_count 减为 0。

这个写者也不能立马开始写(因为此时读者队列不为空),会阻塞在信号量 data_mutex 上,读者队列中的读者全部读取结束后,最后一个读者进程执行 V(data_mutex),唤醒刚才的写者,写者则继续开始进行写操作。

总的来说,在读优先的情况下,写者没有一个有效的手段(锁)来限制读者的无限制增加,而此处的higher_mutex起到的就是这样的作用。

实现同步

信号量不仅可以实现临界区的互斥访问控制,还可以线程间的事件同步。

根据同步的定义,我们不难知道,一个线程等待一个条件成立,需要另一个来通知,而sem<=0唤醒会线程(即同步),所以,是通过V操作来唤醒线程。而P操作会使得等待的线程睡眠,因此,要实现同步的步骤就很明了了。

等待线程:waiting_thread

  • P(sem)
  • do some work
  • V(sem)

通知线程:notify_thread

  • V(sem)
  • P(sem)

具体实现代码如下:

#include <semaphore.h>
#include <pthread.h>
#include <stdio.h>

sem_t sem;

void* waiting_thread(void* arg) {
    while (1) {
        sem_wait(&sem);
        //do something
        printf("I' got it.\n");
        sem_post(&sem);
    }
}

// notify_thread is prior.
void* notify_thread(void* arg) {
    while (1) {
        sem_post(&sem); //notify waiting thread.
        printf("notified");
        sem_wait(&sem); //hang-up waiting thread.
    }
}


int main() {
    sem_init(&sem, 0, 0);

    pthread_t notify_t, waiting_t;
    pthread_create(&notify_t, NULL, notify_thread, NULL);
    pthread_create(&waiting_t, NULL, waiting_thread, NULL);
    pthread_join(notify_t, NULL);
    pthread_join(waiting_t, NULL);
    return 0;
}

总结

从上述过程来说,PV信号量不仅可以实现锁还可以实现同步.

posix下的锁和pthread_cond_t, C++下的锁和condition_variable的都可以用PV信号量来实现。


reader-writer control
https://messenger1th.github.io/2024/07/24/Articles/reader-writer control/
作者
Epoch
发布于
2024年7月24日
许可协议