IPC(Inter Process Communication)

同样的问题和解决办法也适用于线程。

竞争条件

也就是在协作进程中共享了一些公用存储区,但是这些存储区最后的结果取决于进程运行的精确时序,比如A先输出了,但是B也输出了,最后的正确结果是B输出的,所以这种称为竞争条件。也就是说这种需要依赖进程运行顺序的,是竞争的条件。

临界区

为了防止这种情况,必须有某种途径来阻止多个进程同时读写共享的数据,需要进行互斥,即以某种手段确保当一个进程在使用一个共享变量或者文件的时候,其他进程不能做同样的操作。为实现互斥而选择适当的原语是任何操作系统的主要设计内容之一。

对共享内存进行访问的程序片段称为临界区域或者临界区,经过调度,使得两个进程不能同时处于临界区中,就能避免竞争条件,这样就不会依赖于进程运行的顺序。

忙等待的互斥

屏蔽中断

单处理器系统中,使每个进程在刚刚进入临界区后立即屏蔽所有中断,并在就要离开之前再打开中断,屏蔽中断后,时钟中断也被屏蔽,这样CPU就不会被切换到其他进程。这样一个进程屏蔽中断后,就可以检查和修改共享内存,而不必担心其他进程介入。

但是在多处理器中就不好弄了,因为只能对执行disable指令的那个CPU有效,而且如果屏蔽之后不再打开中断,那么系统就有可能终止。

锁变量

设置一个共享锁变量,但是这种还是有可能在设置锁的时候,另一个进程被调度执行。然后同时就有可能有两个进程进入临界区中。

严格轮换法

类似于下面代码描述的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 进程0 等待turn==0才开始运行
while (TRUE){
while (turn != 0){
critical_region();
turn = 1;
noncritical_region();
}
}
# 进程1 等待turn==1才开始运行
while (TRUE){
while (turn != 1){
critical_region();
turn = 0;
noncritical_region();
}
}

开始运行时候,turn为0,进程0开始运行,运行完临界区后,turn为1,进程1开始运行临界区。如果进程0运行完后,turn=1,但是进程1还没有完成上一轮非临界区的执行,这时候turn就为0,进程0就只能阻塞等待了。所以进程0被一个临界区之外的进程阻塞。

这个等待被称为忙等待,用于忙等待的锁,称为自旋锁。

Peterson解法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#define FALSE 0
#define TRUE 1
#define N 2
int turn; /* turn作为全局变量指示现在应该谁进入临界区 */
int interested[N];
void enter_region(int process){
int other;
other = 1 - process;
interested[process] = TRUE;
turn = process;
while (turn == process && interested[other] == TRUE);
}
void leave_region(int process){
interested[process] = False;
}

从代码可以看出来假设只有一个进程进入,那么情况很简单,立马返回就开始执行了。如果进程0进入了,但是没有离开,那么进程1要进入,必须等待interested[other]为False才可以进入。如果几乎是两个一起进入的,那么后面进入的将会将turn值重写,这样前面的就直接进入了临界区(因为turn!=process),后面的进程要等待到interested[other]为False,也就是进程0从临界区出来。

关键在于这个条件判断,interested[other]主要用来在一个已经进入了临界区的进程,要进行等待。turn==process的话主要是用在两个几乎是同时进入,用来第一个进程打破循环先进入临界区的。

TSL指令

TSL RX, LOCK测试并且加锁(Test and Set Lock),将一个内存字lock读到寄存器RX中,然后在该内存地址上存一个非0值。读字和写字的操作是不可分割的,在该指令结束之前其他处理器均不允许访问该内存字。执行TSL指令的CPU将锁住内存总线,禁止其他CPU在本指令结束之前访问内存。

用一个共享变量lock来协调对共享内存的访问,lock为0的时候,任何进程都可以使用TSL指令将其设置为1,并读写共享内存,操作结束之后,进程用一个普通的move指令将lock值重新设置为0.

睡眠与唤醒

之前的这些解决办法都是忙等待,本质上都是:当一个进程想进入临界区时候,先检查是否允许进入,若不允许,则该进程将原地等待,直到允许为止。

sleep和wakeup会在无法进入临界区的时候阻塞而不是忙等待,会引起调用进程阻塞的系统调用,直到另一个进程将其唤醒。

信号量 Semaphore

即PV操作,信号量也就是该资源的有效数目,对信号量进行down操作,首先检查信号量的值是否大于0,如果大于0,则将其值减一然后继续。如果为0,那么进程将会睡眠。up操作将对信号量的值增加1,也就是释放操作,如果有一个或者多个进程在这个信号量上睡眠,那么up之后,信号量的值依然为0,但是有一个进程被唤醒了。

down和up操作都是原子的,down中检查数值,修改变量值以及可能发生的睡眠操作都是一个单一的原子操作,up的值增1和唤醒一个进程也是,这种原子性对于解决同步问题和避免竞争条件是绝对必要的。

通常是将up和down作为系统调用实现,操作系统需要在执行下面操作的时候屏蔽所有中断,测试信号量,更新信号量以及在需要时候使得某个进程睡眠。如果使用多个CPU,则每个信号量由一个锁变量来进行抱回,通过TSL来确保同一个时刻只有一个CPU在对信号量进行操作。

一个生产者和消费者的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#define N 100
typedef int semaphore;
semaphore mutex = 1;
semaphore empty = N;
semaphore full = 0;
void producer(void){
int item;
while (TRUE){
item = produce_item();
down(&empty);
down(&mutex); /* 进入临界区 */
insert_item(item);
up(&mutex); /* 离开临界区 */
up(&full);
}
}
void consumer(void){
int item;
while (TRUE){
down(&full);
down(&mutex);
item = remove_item();
up(&mutex);
up(&empty);
consume_item(item);
}
}

这样信号量就实现了临界区互斥操作,而消费者等待生产者放入item,这又是一种同步操作。同步是多个进程在执行次序上的协调,相互等待消息,而互斥是对临界资源的使用。所以信号量full和empty用来保证某种事件的顺序发生或者不发生,保证了缓冲区满的时候生产者停止运行,缓冲区空的时候消费者停止运行。

互斥量 mutex

互斥量是信号量的一个简化版本,仅仅适用于管理共享资源或者一小段代码,有两个状态,解锁和加锁,当一个线程或者进程需要访问临界区的时候,调用mutex_lock,如果该互斥量当前是解锁的,调用成功,则线程或者进程可以进入该临界区。如果已经加锁,那么线程将会被阻塞,直到在临界区的线程完成并且调用mutex_unlock,如果有多个线程阻塞,那么随机选择一个。

enter_region的区别在于,enter_region在阻塞的时候会一直运行,测试锁,忙等待,而这个线程也不会得到锁,因为不会切换给其他线程运行来释放锁。但是mutex_lock在没有获得锁的时候会thread_yield,所以没有忙等待,在该线程下一次运行的时候,再一次对锁进行测试。

这个方法和之前提到的加锁区别在于互斥量使用了TSL,保证了原子性。

但是有一个问题是,对于用户级线程,可以共用一个互斥量,因为所有的线程都在一个公共地址空间中操作。但是对于进程来说,就必须访问一些共享内存。两种方案,第一有些共享数据结构,比如信号量是可以放在内核中的,并且只能通过系统调用来访问。第二种,是进程和其他进程共享部分地址空间,或者共享文件。

Pthread中的互斥

  • pthread_mutex_init
  • pthread_mutex_destroy
  • pthread_mutex_lock
  • pthread_mutex_try_lock
  • pthread_mutex_unlock

另外一种同步机制:条件变量,其允许线程由于一些未达到的条件而阻塞。

  • pthread_cond_init
  • pthread_cond_destroy
  • pthread_cond_wait
  • pthread_cond_signal 向另一个线程发信号来唤醒它
  • pthread_cond_broadcast 向多个线程发信号来让他们全部唤醒

被阻塞的线程经常是在等待发信号的线程去做某些工作,释放某些资源或者是进行其他一些活动,只有完成后阻塞的线程才可以继续运行。

条件变量与互斥量一起使用,让一个线程锁住一个互斥量,然后当它不能获得期待的结果时候等待一个条件变量。最后另一个线程会向它发送信号,然后继续执行。pthread_cond_wait原子性调用并且解锁它持有的互斥量,所以互斥量是参数之一。

一起使用的原因是因为有临界区需要进行互斥操作,如果没有临界区就可以直接使用条件变量来进行资源的同步。比如aiossdb中需要获取可用连接时候等待其他协程释放连接,然后通知等待的条件变量。

一个使用信号量的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <stdio.h>
#include <pthread.h>
#define MAX 10000000
pthread_mutex_init the_mutex;
pthread_cond_init condc, condp;
int buffer = 0;
void *producer(void *ptr){
int i;
for(i=1;i<=MAX;i++){
pthread_mutex_lock(&the_mutex); /* 互斥使用缓冲区,在条件变量wait时候会解锁 */
while(buffer != 0) pthread_cond_wait(&condp, &the_mutex);
buffer = i;
pthread_cond_signal(&condc);
pthread_mutex_unlock(&the_mutex); /* 释放缓冲区 */
}
pthread_exit(0);
}
void *consumer(void *ptr){
int i;
for (i=1;i<=MAX;i++){
pthread_mutex_lock(&the_mutex);
while(buffer == 0) pthread_cond_wait(&condc,&the_mutex);
buffer = 0;
pthread_cond_signal(&condp);
pthread_mutex_unlock(&the_mutex);
}
pthread_exit(0);
}
int main(int argc, char **argv){
pthread_t pro, con;
pthread_mutex_init(&the_mutex, 0);
pthread_cond_init(&condc, 0);
pthread_cond_init(&condp, 0);
pthread_create(&con, 0, consumer, 0);
pthread_create(&pro, 0, producer, 0);
pthread_join(pro, 0);
pthread_join(con, 0);
pthread_cond_destroy(&condc);
pthread_cond_destroy(&condp);
pthread_mutex_destroy(&the_mutex);
}

管程 monitor

一个管程是一个由过程,变量以及数据结构等组成的一个集合,进程可以在任何需要的时候调用管程中的过程,但是不能在管程之外声明的过程中直接访问管程内的数据结构。任意时刻管程中只能由一个活跃进程,这样管程就可以实现互斥,也就是实现了一个集合,比如Java中的synchronized,Java会保证一旦某个线程执行该方法,就不允许其他线程执行该对象中的任何synchronized方法。

一个管程过程发现其无法继续运行的时候,会在某个条件变量上执行wait操作,会导致调用进程自身阻塞,并且将一个等待管程过程的进程调入管程,比如缓冲区满。消费者消费之后,可以唤醒正在睡眠的生产者进程,可以给正在等待的条件变量执行signal完成,一般会在最后执行signal,这样,执行后该进程就会退出管程。

对管程过程的自动互斥保证了进程完成wait操作不会被打断,也就是在wait完成之前切换进程,在wait执行完成把生产者标志为不可运行之前,就不会允许消费者进入管程。

新的问题是分布式系统中,每个CPU由自己的私有内存,这样目前使用的原语没有提供机器间的信息交换方法。

消息传递 message passing

进程间通信的方式,使用这两条原语sendreceive,如果没有消息可用,接收者可能被阻塞,直到一条消息到达。

消息传递系统的设计要点

  • 发送,确认。
  • 原始消息嵌入序号
  • 身份认证

消息传递解决生产者-消费者问题

首先消费者向生产者发送空消息,生产者拿走空消息,然后生产填充消息,然后发送给消费者。消费者拿走消息,消费并且向生产者发送空消息。

对消息进行编址的两种方式:

  • 为每个进程分配一个唯一地址,让消息按照进程地址编址
  • 使用信箱,对一定数量的消息进行缓冲,如果进程向满的信箱发送消息,将会被挂起,直到由空消息,类似于消息队列

通常在并行程序设计系统中使用消息传递。下面是一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#define N 100
void producer(void){
int item;
message m;
while(TRUE){
item = produce_item();
receive(consumer, &m); /* 等待消费者发送空缓冲区 */
build_message(&m, item);
send(consumer, &m);
}
}
void consumer(void){
int item, i;
message m;
for (i=0;i<N;i++) send(producer, &m);
while(TRUE){
receive(producer, &m);
item = extrace_item(&m);
send(producer, &m);
consume_item(item);
}
}

屏障

对于划分了多个阶段的进程组来说,必须所有进程都完成了这一阶段的工作,才可以进入下一个阶段。使得每一个进程在完成当前迭代工作后执行一个barrier操作。具体实现,有可能是一个计数器,也可能是一个不断的轮询。

总结

进程通信,第一,进程如何将信息传递给另外一个,第二确保进程之间在临界区不会冲突,即互斥,第三个是进程之间的同步问题(顺序问题)。

第一个,管道,共享存储区,共享文件,共享数据结构(信号量),消息传递(消息队列)。其中线程本身就是共享一个地址空间(同一个进程的线程)

第二个,互斥,忙等待,信号量,互斥量,管程等

第三个,信号量实现同步