CSAPP-并发编程

编程/技术 更新时间: 2019-06-01 @ 08:53:11 创建时间: 2019-06-01 @ 08:51:06 浏览数: 202 净访问: 177 By: skyrover

本博客采用创作共用版权协议, 要求署名、非商业用途和保持一致. 转载本博客文章必须也遵循署名-非商业用途-保持一致的创作共用协议


主要有三种方法,另外讲了同步(使用信号量),都是比较基础的内容,卒获有所闻。

现代操作系统提供了三种基本的构造并发程序的方法:

  • 进程,进程有独立的虚拟地址空间,想要和其他进程通信必须使用进程间通信机制(IPC)
  • I/O多路复用,在同一个进程中显式调用逻辑流,逻辑流被模型化为状态机,数据到达文件描述符后,主程序显式的从一个状态转换到另一个状态,所有流共享同一个地址空间。
  • 线程,运行在单一进程上下文中的逻辑流,内核进行调度

12.1 基于进程的并发编程

在父进程中接受客户端的连接请求,然后创建一个新的子进程来为每个客户端提供服务。主要就在于监听描述符和已连接描述符的区别上,主进程接受连接请求后,会返回一个已连接描述符,这时候服务器fork一个子进程,子进程有主进程描述符表的完整拷贝,之后主进程关闭已连接描述符,子进程关闭监听描述符,因为所有进程公用一张文件表。然后fork出的子进程就会服务对应的客户端。同理如果再有客户端请求服务,就有新的子进程fork出来。

#include "csapp.h"
void echo(int connfd);

void sigchld_handler(int sig){
    while (waitpid(-1, 0, WNOHANG) > 0){
        ;
    }
    return;
}

int main(int argc, char **argv){
    int listenfd, connfd, port;
    socklen_t clientlen = sizeof(struct sockaddr_in);
    struct sockaddr_in clientaddr;

    if (argc != 2){
        fprintf(stderr, "usage");
        exit(0);
    }

    port = atoi(argv[1]);
    signal(SIGCHLD, sigchld_handler);
    listenfd = open_listenfd(port);
    while (1){
        connfd = accept(listenfd, (SA *) &clientaddr, &clientlen);
        if (fork() == 0){
            close(listenfd);
            echo(connfd);
            close(connfd);
            exit(0);
        }
        close(connfd);
    }
}

12.1.2 关于进程的优劣

  • 优点:进程有独立的地址空间
  • 比较慢,进程控制和共享信息比较麻烦

Unix IPC:信号,管道,先进先出,系统V共享存储器,系统V信号量。以及套接字接口

12.2 基于I/O多路复用的并发编程

I/O多路复用技术(I/O multiplexing)基本思路就是使用select函数,要求内核挂起进程,只有在一个或者多个IO事件发生后,才将控制返回给应用程序。下面是select函数定义和常用的宏

#include <unistd.h>
#include <sys/types.h>

int select(int n, fd_set *fdset, NULL, NULL, NULL);

FD_ZERO(fd_set *fdset);  // 清除fdset中所有位
FD_CLR(int fd, fd_set *fdset);  // 清除fdset中fd的位
FD_SET(int fd, fd_set *fdset);  // 打开fdset中fd的位
FD_ISSET(int fd, fd_set *fdset);  // 在fdset中fd是否打开?

select 函数会一直阻塞,直到集合中至少有一个描述符准备好了。并且select 会修改传入的参数fdset,将其作为准备好的集合。

#include "csapp.h"
void echo(int connfd);
void command(void);

int main(int argc, char **argv){
    int listenfd, connfd, port;
    socklen_t clientlen = sizeof(struct sockaddr_in);
    struct sockaddr_in clientaddr;
    fd_set read_set, ready_set;

    if (argc != 2){
        fprintf(stderr, "usage: %s <port>\n", argv[0]);
        exit(0);
    }

    port = atoi(argv[1]);
    listenfd = open_listenfd(port);

    FD_ZERO(&read_set);
    FD_SET(STDIN_FILENO, &read_set);
    FD_SET(listenfd, &read_set);

    while(1){
        ready_set = read_set;
        select(listenfd+1, &ready_set, NULL, NULL, NULL);
        if (FD_ISSET(STDIN_FILENO, &ready_set)){
            command();
        }
        if (FD_ISSET(listenfd, &ready_set)){
            connfd = accept(listenfd, (SA *)&clientaddr, &clientlen);
            echo(connfd);
            close(connfd);
        }
    }
}

12.2.1 基于IO多路复用的并发事件驱动服务器

IO多路复用可以用作并发事件驱动程序的基础,事件驱动程序中,流是因为某种事件而前进的。服务器使用IO多路复用,借助select函数检测输入事件的发生,当每个已连接描述符准备好可读时候,服务器就为相应的状态机执行转移。

对于每个新的客户端,都会创建一个新的状态机,然后和对应的描述符关联起来。

这块有两种事件:

  1. 来自一个新客户端的连接请求到达,这个是对监听描述符进行操作的
  2. 一个已存在的客户端的已连接描述符准备好可以读了。

对应的操作就是:

  1. 将客户端加到池里
  2. 将每个准备好的已连接描述符的一个文本行回送回去
#include "csapp.h"

typedef struct{
    int maxfd;
    fd_set read_set;
    fd_set ready_set;
    int nready;
    int maxi;
    int clientfd[FD_SETSIZE];
    rio_t clientrio[FD_SETSIZE];
} pool;

int byte_cnt = 0;

int main(int argc, char **argv){
    int listenfd, connfd, port;
    socklen_t clientlen = sizeof(struct sockaddr_in);
    struct sockaddr_in clientaddr;
    static pool pool;
    if (argc != 2){
        exit(0);
    } 
    port = atoi(argv[1]);

    listenfd = open_listenfd(port);
    init_pool(listenfd, &pool);

    while(1){
        pool.ready_set = pool.read_set;
        pool.nready = select(pool.maxfd + 1, &pool.ready_set, NULL, NULL, NULL);
        if (FD_ISSET(listenfd, &pool.ready_set)){
            connfd = accept(listenfd, (SA *)&clientaddr, &clientlen);
            add_client(connfd, &pool);
        }

        check_clients(&pool);
    }
}

void init_pool(int listenfd, pool *p){
    int i;
    p->maxi = -1;
    for(i = 0; i < FD_SETSIZE; i++){
        p->clientfd[i] = -1;
    }

    p->maxfd = listenfd;
    FD_ZERO(&p->read_set);
    FD_SET(listenfd, &p->read_set);
}

void add_client(int connfd, pool *p){
    int i;
    p->nready--;
    for(i=0; i<FD_SETSIZE; i++){
        if (p->clientfd[i] < 0){
            p->clientfd[i] = connfd;
            rio_readinitb(&p->clientrio[i], connfd);
            FD_SET(connfd, &p->read_set);
            if (connfd > p->maxfd){
                p->maxfd = connfd;
            }
            if (i>p->maxi){
                p->maxi = i;
            }
            break;
        }
    }
    if (i==FD_SETSIZE){
        app_error("");
    }
}

void check_clients(pool *p){
    int i, connfd, n;
    char buf[MAXLINE];
    rio_t rio;

    for(i = 0; (i <= p->maxi) && (p->nready > 0); i++){
        connfd = p->clientfd[i];
        rio = p->clientrio[i];

        if ((connfd > 0) && (FD_ISSET(connfd, &p->ready_set))){
            p->nready--;
            if ((n = rio_readlineb(&rio, buf, MAXLINE)) != 0){
                byte_cnt += n;
                rio_writen(connfd, buf, n);
            }else{
                close(connfd);
                FD_CLR(connfd, &p->read_set);
                p->clientfd[i] = -1;
            }
        }
    }
}

select函数检测到输入事件,add_client函数创建一个新的逻辑流(状态机),check_clients函数通过回送输入行来执行状态转移,当客户端完成发送时候,删除这个状态机。

12.2.2 IO多路复用技术的优劣

  • 在单一进程上下文中运行
  • 编码复杂
  • 不能充分利用多核处理器

12.3 基于线程的并发编程

线程就是上面两种方法的混合,不需要自己控制调度,内核会自动进行,而且共享整个进程的地址空间。每个线程都有自己的线程上下文,线程ID,栈,栈指针,程序计数器,通用目的寄存器和条件码。

12.3.1 线程执行模型

每个进程开始生命周期时候是单一线程,为主线程。然后会创建对等线程。线程的上下文切换比进程小的多。和一个进程相关的线程会组成一个对等线程池,一个线程可以杀死它的任何对等线程,或者等待它的任意对等线程终止。每个对等线程都可以读写相同的数据。

另外主线程也可以被子线程杀死,比如下面:

#include <pthread.h>
#include <unistd.h>

#include <stdio.h>

void* func(void* arg)
{
    pthread_t main_tid = *static_cast<pthread_t*>(arg);
    pthread_cancel(main_tid);
    while (true)
    {
        //printf("child loops\n");
    }
    return NULL;
}

int main(int argc, char* argv[])
{
    pthread_t main_tid = pthread_self();
    pthread_t tid = 0;
    pthread_create(&tid, NULL, func, &main_tid);
    while (true)
    {
        printf("main loops\n");
    }
    sleep(1);
    printf("main exit\n");
    return 0;
}

在这里可以回答一个问题,就是主线程退出,子线程会被强制退出吗?答案是不一定,要看主线程怎么退出,如果主线程是上面代码那种形式退出,子线程就会继续运行,如果main调用了exit或者return退出,那么子线程就会被终止回收,因为这时候是进程退出导致主线程退出。

12.3.2 Posix线程

#include "csapp.h"
void *thread(void *vargp);

int main(){
    pthread_t tid;
    pthread_create(&tid, NULL, thread, NULL);
    pthread_join(tid, NULL);
    exit(0);
}

void *thread(void *vargp){
    printf("Hello, world!\n");
    return NULL;
}

12.3.3 创建线程

#include <pthread.h>
typedef void *(func)(void *);

int pthread_create(pthread_t *tid, pthread_attr_t *attr, func *f, void *arg);

// 返回自己的线程id
pthread_t pthread_self(void);

12.3.4 终止线程

某个对等线程调用Unix的exit函数,该函数终止进程以及所有与进程相关的线程

// 线程终止,如果是主线程调用,则会等待所有其他对等线程终止
// 然后终止主线程和整个进程
void pthread_exit(void *thread_return);

// 终止tid线程
int pthread_cancel(pthread_t tid);

12.3.5 回收已终止线程的资源

线程通过调用phread_join函数等待其他线程终止,并且只能等待一个指定的线程终止。pthread_join函数会阻塞,直到线程tid终止,将线程返回的(void *)指针赋值为thread_return指向的位置,然后回收已终止线程占用的所有存储器资源。

#include <pthread.h>
int pthread_join(pthread_t tid, void **thread_return);

12.3.6 分离线程

可结合线程能够被其他线程收回其资源和杀死。分离的线程是不能被其他线程回收或者杀死,它的存储器资源在它终止时候被系统自动释放。

int pthread_detach(pthread_t tid);

一般会在Web服务器上使用分离线程,因为每个连接都是一个单独的线程独立处理,没有必要等待每个对等线程终止。

12.3.7 初始化线程

pthread_once_t once_control = PTHREAD_ONCE_INIT;
int pthread_once(pthread_once_t *once_control, void (*init_routine)(void));

用作动态初始化多个线程共享的全局变量。。

12.3.8 一个基于线程的并发服务器

#include "csapp.h"

void echo(int connfd);
void *thread(void *vargp);

int main(int argc, char **argv){
    int listenfd, *connfdp, port;
    socklen_t clientlen = sizeof(struct sockaddr_in);
    struct sockaddr_in clientaddr;
    pthread_t tid;

    if (argc != 2){
        exit(0);
    }

    port = atoi(argv[1]);
    listenfd = open_listenfd(port);
    while(1){
        // 这里将每个accept返回的已连接描述符分配到自己的动态分配的存储器块
        // 如果直接向线程传入对应的已连接描述符就会导致竞争
        // 因为可能在线程里的赋值语句会在下次accept之后完成
        connfdp = malloc(sizeof(int));
        *connfdp = accept(listenfd, (SA *) &clientaddr, &clientlen);
        pthread_create(&tid, NULL, thread, connfdp);
    }
}

void *thread(void *vargp){
    int connfd = *((int *) vargp);
    // 分离线程
    pthread_detach(pthread_self());
    // 释放之前申请的存储器块
    free(vargp);
    echo(connfd);
    close(connfd);
    return NULL;
}

12.4 多线程程序中的共享变量

  • 线程的基础存储器模型
  • 变量实例如何映射到存储器
  • 有多少线程引用这些实例

一个变量是共享的,仅当多个线程引用这个变量的某个实例。

#include "csapp.h"
#define N 2
void *thread(void *vargp);

char **ptr;

int main(){
    int i;
    pthread_t tid;
    char *msgs[N] = {
        "hello from foo",
        "hello from bar"
    };
    ptr = msgs;
    for(i = 0; i < N; i++){
        pthread_create(&tid, NULL, thread, (void *)i);
    }
    pthread_exit(NULL);
}

void *thread(void *vargp){
    int myid = (int)vargp;
    static int cnt = 0;
    printf("[%d]: %s (cnt = %d)\n", myid, ptr[myid], ++cnt);
    return NULL;
}

12.4.1 线程存储器模型

线程之间寄存器是从不共享的,虚拟存储器总是共享的。线程的栈通常是被相应线程独立访问,如果一个线程可以得到一个指向其他线程栈的指针,那么就可以读写这个栈的任何部分。

12.4.2 将变量映射到存储器

  • 全局变量:虚拟存储器的读写区域只包含每个全局变量的一个实例,任何线程都可以引用
  • 本地自动变量:每个线程的栈都包含它自己的所有本地自动变量的实例
  • 本地静态变量:虚拟存储器读写区域只包含在程序中声明的每个静态变量的一个实例。每个对等线程都读和写这个实例。

12.4.3 共享变量

根据共享变量的定义,cnt就是一个,两个线程引用。myid不是,因为两个实例中每一个只有一个线程引用。msgs是可以被共享的。

12.5 用信号量同步线程

共享变量可能会导致同步错误,这是因为生成的机器指令会以一种顺序一个接一个完成,但是并发的线程会导致指令的交叉,有些会产生正确的结果,有些不会。比如:在线程1更新某一个数据之前,线程1加载之后,线程2加载了这一数据。

有两个线程会同时执行下面这一代码:

// 使用volatile关键字声明的变量或对象通常具有与优化、多线程相关的特殊属性。
// 通常,volatile关键字用来阻止(伪)编译器认为的无法“被代码本身”改变的代码(变量/对象)进行优化。
// 如在C语言中,volatile关键字可以用来提醒编译器它后面所定义的变量随时有可能改变,因此编译后的程序每次需要存储或读取这个变量的时候,都会直接从变量地址中读取数据。
// 如果没有volatile关键字,则编译器可能优化读取和存储,可能暂时使用寄存器中的值,如果这个变量由别的程序更新了的话,将出现不一致的现象。
volatile int cnt = 0;

void *thread(void *vargp){
    int i, niters = *((int *)vargp);
    for (i = 0; i < niters; i++){
        cnt ++;
    }
    return NULL:
}

这一段代码的汇编代码如下:

movl (%rdi), %ecx
mov1 $0, %edx
cmp %ecx, %edx
jge .L13
.L11:
    movl cnt(%rip), %eax
    incl %eax
    movl %eax, cnt(%rip)
    incl %edx
    cmpl %ecx, %edx
    j1 .L11
.L13:

所以可以看出来导致不一致的原因就是读取cnt的值,然后增加,再存储回去这一过程没有原子性,所以对于不同的线程就可能以各种顺序执行这几条指令。

12.5.1 进度图

了解了程序执行的关键状态,就能知道哪些是关键状态,由此可以得出程序的临界区(同一个关键状态的组合对应于多个线程),这个临界区不能与其他进程的临界区交替执行,也就是每个线程在执行它的临界区指令时候,要对共享变量的互斥访问。这时候我们就需要同步线程,使它总有一条安全轨迹线,信号量就可以做到这一点。

12.5.2 信号量semaphore

其实就是加一个在中途计算不能被中断的数,因为不能中断其操作的指令,所以它具有原子性,也就能保证对临界变量的控制,保证并发程序的轨迹线。

s其实表示了当前信号量的可使用性,非0表示有资源可以使用,0表示无资源,需要等待。

  • P操作,如果s非0,则s减1,返回。否则,挂起这个线程,等待V操作将s加1,并且重启等待的线程。
  • V操作,将s加1,也就相当于释放资源,释放临界区。

而且信号量不可能有负值,称为信号量不变性。

#include <semaphore.h>

int sem_init(sem_t *sem, 0, unsigned int value);
int sem_wait(sem_t *s);  // P操作
int sem_post(sem_t *s);  // V操作

12.5.3 使用信号量来实现互斥

P和V操作建立的禁止区使得在任何时间点上,在被包围的临界区中,不可能有多个线程在执行指令。信号量操作确保了对临界区的互斥访问。

volatile int cnt = 0;
sem_t mutex;

sem_init(&mutex, 0, 1);

for(i = 0; i < niters; i++){
    p(&mutex);
    cnt++;
    v(&mutex);
}

12.5.4 利用信号量来调度共享资源

  • 生产者-消费者问题

生产者消费者共享一个有限缓冲区,因为插入和取出项目都涉及更新共享变量,所以必须保证对缓冲区访问是互斥的。除了访问互斥,还需要调度对缓冲区的访问,什么时候有数据可用,什么时候不可用需要生产数据。

下面是一个例子:

void sbuf_insert(sbuf_t *sp, int item){
    p(&sp->slots);  // 减少了可用槽位,用来生产数据
    p(&sp->mutex);
    sp->buf[(++sp->rear)%(sp->n)] = item;
    v(&sp->mutex);
    v(&sp->items);  // 标记一个可用数据
}

int sbuf_remove(sbuf_t *sp){
    int item;
    p(&sp->items);
    p(&sp->mutex);
    item = sp->buf[(++sp->front)%(sp->n)];
    v(&sp->mutex);
    v(&sp->slots);
    return item;
}
  • 读者-写者问题

写者需要拥有对对象的独占访问,读者可以和无限多个其他读者共享对象。根据读者和写者优先级不同有以下问题:

  • 读者优先
  • 写者优先
int readcnt;
sem_t mutex, w;

void reader(void){
    while(1){
        p(&mutex);
        readcnt ++;
        if (readcnt == 1){
            p(&w);
        }
        v(&mutex);

        p(&mutex);
        readcnt --;
        if (readcnt == 0){
            v(&w);
        }
        v(&mutex);
    }
}

void writer(void){
    while(1){
        p(&w);
        v(&w);
    }
}

可能会导致饥饿,如果有读者不断到达,写者就可能无限期等待。

12.5.5 综合:基于预线程化的并发服务器

采用生产者消费者模型的服务器,服务器是由一个主线程和一组工作者线程构成,主线程不断接受来自客户端的连接请求,并将得到的连接描述符放在一个有限缓冲区中,每一个工作者线程反复的从共享缓冲区中取出描述符,为客户端服务,然后等待下一个描述符。

#include "csapp.h"
#include "sbuf.h"
#define NTHREADS 4
#define SBUFSIZE 16

void echo_cnt(int connfd);
void *thread(void *vargp);

sbuf_t sbuf;

int main(int argc, char **argv){
    int i, listenfd, connfd, port;
    socklen_t clientlen = sizeof(struct sockaddr_in);
    struct sockaddr_in clientaddr;
    pthread_t tid;

    if (argc != 2){
        fprintf(stderr, "usage");
        exit(0);
    }

    port = atoi(argv[1]);
    sbuf_init(&sbuf, SBUFSIZE);
    listenfd = open_listenfd(port);
    for (i = 0; i < NTHREADS; i++){
        pthread_create(&tid, NULL, thread, NULL);
    }
    while(1){
        connfd = accept(listenfd, (SA *)&clientaddr, &clientlen);
        sbuf_insert(&sbuf, connfd);
    }
}

void *thread(void *vargp){
    pthread_detach(pthread_self());
    while(1){
        int connfd = sbuf_remove(&sbuf);
        echo_cnt(connfd);
        close(connfd);
    }
}

static int byte_cnt;
static sem_t mutex;

static void init_echo_cnt(void){
    sem_init(&mutex, 0, 1);
    byte_cnt = 0;
}

void echo_cnt(int connfd){
    int n;
    char buf[MAXLINE];
    rio_t rio;
    static pthread_once_t once = PTHREAD_ONCE_INIT;

    pthread_once(&once, init_echo_cnt); //初始化与线程例程相关的状态
    rio_readinitb(&rio, connfd);
    while((n = rio_readlineb(&rio, buf, MAXLINE)) != 0){
        p(&mutex);
        byte_cnt += n;
        v(&mutex);
        rio_writen(connfd, buf, n);
    }
}

编写事件驱动程序有两种方式:

  • 基于IO多路复用(select函数)
  • 线程(生产者消费者)

12.6 使用线程提高并行性

一个利用线程并行来计算的问题示例:

#include "csapp.h"
# define MAXTHREADS 32

void *sum(void *vargp);

long psum[MAXTHREADS];
long nelems_per_thread;

int main(int argc, char **argv){
    long i, nelems, log_nelems, nthreads, result = 0;
    pthread_t tid[MAXTHREADS];
    int myid[MAXTHREADS];

    nthreads = atoi(argv[1]);
    log_nelems = atoi(argv[2]);
    nelems = (1L << log_nelems);
    nelems_per_thread = nelems / nthreads;

    for(i = 0; i < nthreads; i++){
        myid[i] = i;
        pthread_create(&tid[i], NULL, sum, &myid[i]);
    }
    for(i = 0; i < nthreads; i++){
        pthread_join(tid[i], NULL);
    }
    for (i = 0; i < nthreads; i++){
        result += psum[i];
    }

    if (result != (nelems * (nelems - 1)) / 2){
        printf("Error");
    }
    exit(0);
}

void *sum(void *vargp){
    int myid = *((int *)vargp);
    long start = myid * nelems_per_thread;
    long end = start + nelems_per_thread;
    long i, sum = 0;
    for(i = start; i < end; i++){
        sum += i;
    }
    psum[myid] = sum;
    return NULL;
}

12.7 其他并发问题

12.7.1 线程安全

一个函数被称为线程安全的:当且仅当被多个并发线程反复调用时候,它会一直产生正确的结果。

线程不安全的几种情况:

  1. 不保护共享变量的函数
  2. 保持跨越多个调用的状态的函数,后面函数调用结果依赖于上次调用的中间结果
  3. 返回指向静态变量的指针的函数
  4. 调用线程不安全函数的函数

加锁-拷贝

12.7.2 可重入性

可重入函数是一种线程安全函数,特点是在被多个线程调用的时候,不会引用任何共享数据。有两种:

  • 显式可重入:都是传值,而且没有引用静态或者全局变量
  • 隐式可重入:传的是指针,但是没有指向共享数据

12.7.3 在线程化的程序中使用已存在的库函数

加锁拷贝,但是比较慢。可以使用带_r后缀的可重入版本函数

12.7.4 竞争

当程序正确性依赖于多个线程的执行顺序的话,就会导致竞争,所以并发编程中一定要考虑全面:

#include "csapp.h"
#define N 4

void *thread(void *vargp)

int main(){
    pthread_t tid[N];
    int i, *ptr;
    for(i = 0; i < N; i++){
        // 使用独立的块地址避免竞争
        ptr = malloc(sizeof(int));
        *ptr = i;
        pthread_create(&tid[i], NULL, thread, ptr);
    }
    for(i = 0; i < N; i++){
        pthread_join(tid[i], NULL);
    }
    exit(0);
}

void *thread(void *vargp){
    int myid = *((int *)vargp);
    free(vargp);
    return NULL;
}

12.7.5 死锁

一组线程被阻塞了,等待一个永远也不会为真的条件。使用进度图。

  • 使用p,v操作顺序不当
  • 死锁区域

互斥锁加锁顺序:对于每对互斥锁(s,t),每个同时占用s和t的线程都按照同样顺序加锁,那么就是无死锁的


点赞走一波😏


评论

提交评论