进程间通信 - 信号量
首先想到的问题是,为什么我们需要信号量?简单来说,是为了保护多个进程共享的关键/公共区域。
假设多个进程使用相同的代码区域,如果所有进程都想要并行访问,则结果会发生重叠。例如,多个用户仅使用一台打印机(公共/关键部分),假设有3个用户,同时提交3个作业,如果所有作业都并行启动,则一个用户的输出会与另一个用户的输出重叠。因此,我们需要使用信号量来保护它,即在一个进程运行时锁定关键部分,并在完成后解锁。这将对每个用户/进程重复执行,以确保一个作业不会与另一个作业重叠。
基本上,信号量分为两种类型:
二元信号量 - 只有两种状态 0 和 1,即锁定/解锁或可用/不可用,互斥实现。
计数信号量 - 允许任意资源计数的信号量称为计数信号量。
假设我们有5台打印机(为了理解,假设一台打印机只接受一个作业),我们有3个作业需要打印。现在,3个作业将分配给3台打印机(每台一台)。在此过程中,又来了4个作业。现在,在2台可用的打印机中,已经安排了2个作业,我们还剩下2个作业,只有在其中一个资源/打印机可用后才能完成。这种根据资源可用性进行的调度可以视为计数信号量。
要使用信号量执行同步,请执行以下步骤:
步骤1 - 创建信号量或连接到已存在的信号量 (semget())
步骤2 - 对信号量执行操作,即分配、释放或等待资源 (semop())
步骤3 - 对消息队列执行控制操作 (semctl())
现在,让我们用我们拥有的系统调用来检查一下。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semget(key_t key, int nsems, int semflg)
此系统调用创建或分配 System V 信号量集。需要传递以下参数:
第一个参数 key 识别消息队列。key 可以是任意值,也可以是从库函数 ftok() 派生的值。
第二个参数 nsems 指定信号量的数量。如果是二元信号量,则为 1,表示需要 1 个信号量集,否则根据所需的信号量集数量而定。
第三个参数 semflg 指定所需的信号量标志,例如 IPC_CREAT(如果信号量不存在则创建)或 IPC_EXCL(与 IPC_CREAT 一起使用以创建信号量,如果信号量已存在则调用失败)。还需要传递权限。
注意 - 请参阅前面的部分了解权限的详细信息。
此调用成功时将返回有效的信号量标识符(用于进一步调用信号量),失败时返回 -1。要了解失败的原因,请检查 errno 变量或 perror() 函数。
此调用相关的各种错误包括 EACCESS(权限被拒绝)、EEXIST(队列已存在,无法创建)、ENOENT(队列不存在)、ENOMEM(没有足够的内存来创建队列)、ENOSPC(超过最大集合限制)等。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semop(int semid, struct sembuf *semops, size_t nsemops)
此系统调用对 System V 信号量集执行操作,例如分配资源、等待资源或释放资源。需要传递以下参数:
第一个参数 semid 指示由 semget() 创建的信号量集标识符。
第二个参数 semops 是指向要对信号量集执行的操作数组的指针。结构如下:
struct sembuf { unsigned short sem_num; /* Semaphore set num */ short sem_op; /* Semaphore operation */ short sem_flg; /* Operation flags, IPC_NOWAIT, SEM_UNDO */ };
上述结构中的元素 sem_op 指示需要执行的操作:
如果 sem_op 为负数,则分配或获取资源。阻塞调用进程,直到其他进程释放足够的资源,以便此进程可以分配。
如果 sem_op 为零,则调用进程等待或休眠,直到信号量值达到 0。
如果 sem_op 为正数,则释放资源。
例如:
struct sembuf sem_lock = { 0, -1, SEM_UNDO };
struct sembuf sem_unlock = {0, 1, SEM_UNDO };
第三个参数 nsemops 是该数组中的操作数。
#include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> int semctl(int semid, int semnum, int cmd, …)
此系统调用对 System V 信号量执行控制操作。需要传递以下参数:
第一个参数 semid 是信号量的标识符。此 ID 是信号量标识符,它是 semget() 系统调用的返回值。
第二个参数 semnum 是信号量的编号。信号量从 0 开始编号。
第三个参数 cmd 是执行所需信号量控制操作的命令。
第四个参数的类型是 union semun,取决于 cmd。对于某些情况,第四个参数不适用。
让我们检查一下 union semun:
union semun { int val; /* val for SETVAL */ struct semid_ds *buf; /* Buffer for IPC_STAT and IPC_SET */ unsigned short *array; /* Buffer for GETALL and SETALL */ struct seminfo *__buf; /* Buffer for IPC_INFO and SEM_INFO*/ };
在 sys/sem.h 中定义的 semid_ds 数据结构如下:
struct semid_ds { struct ipc_perm sem_perm; /* Permissions */ time_t sem_otime; /* Last semop time */ time_t sem_ctime; /* Last change time */ unsigned long sem_nsems; /* Number of semaphores in the set */ };
注意 - 请参阅手册页了解其他数据结构。
union semun arg; cmd 的有效值为:
IPC_STAT - 将 struct semid_ds 的每个成员的当前值的信 息复制到 arg.buf 指向的结构中。此命令需要对信号量具有读取权限。
IPC_SET - 设置所有者的用户 ID、组 ID、权限等,这些都由 semid_ds 结构指向。
IPC_RMID - 删除信号量集。
IPC_INFO - 返回有关信号量限制和参数的信息,这些信息包含在 arg.__buf 指向的 semid_ds 结构中。
SEM_INFO - 返回一个 seminfo 结构,其中包含有关信号量消耗的系统资源的信息。
此调用将返回一个值(非负值),具体取决于传递的命令。成功时,IPC_INFO 和 SEM_INFO 或 SEM_STAT 返回最高使用条目的索引或标识符(根据信号量),或者 GETNCNT 的 semncnt 值,或者 GETPID 的 sempid 值,或者 GETVAL 的 semval 值,其他操作成功时返回 0,失败时返回 -1。要了解失败的原因,请检查 errno 变量或 perror() 函数。
在查看代码之前,让我们了解一下它的实现:
创建两个进程,例如子进程和父进程。
创建共享内存,主要用于存储计数器和其他标志,以指示共享内存中读/写进程的结束。
计数器由父进程和子进程递增。计数器值可以作为命令行参数传递,也可以作为默认值(如果未作为命令行参数传递或值小于 10000)。调用具有特定休眠时间,以确保父进程和子进程同时访问共享内存,即并行访问。
由于计数器由父进程和子进程以 1 为步长递增,因此最终值应该是计数器的两倍。由于父进程和子进程同时执行操作,因此计数器未按要求递增。因此,我们需要确保一个进程完成之后才能执行另一个进程。
所有上述实现都在文件 shm_write_cntr.c 中执行。
检查计数器值是否在文件 shm_read_cntr.c 中实现。
为了确保完成,信号量程序在文件 shm_write_cntr_with_sem.c 中实现。在整个进程完成后(读取完成后)删除信号量。
由于我们有单独的文件来读取共享内存中的计数器值,并且写入不会对其产生任何影响,因此读取程序保持不变 (shm_read_cntr.c)
最好在一个终端中执行写入程序,在另一个终端中执行读取程序。由于程序只有在写入和读取过程完成后才能完成执行,因此在完全执行写入程序后运行程序是可以的。写入程序将等待读取程序运行,并且只有在读取程序完成后才会结束。
无信号量的程序。
/* Filename: shm_write_cntr.c */ #include<stdio.h> #include<sys/ipc.h> #include<sys/shm.h> #include<sys/types.h> #include<string.h> #include<errno.h> #include<stdlib.h> #include<unistd.h> #include<string.h> #define SHM_KEY 0x12345 struct shmseg { int cntr; int write_complete; int read_complete; }; void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count); int main(int argc, char *argv[]) { int shmid; struct shmseg *shmp; char *bufptr; int total_count; int sleep_time; pid_t pid; if (argc != 2) total_count = 10000; else { total_count = atoi(argv[1]); if (total_count < 10000) total_count = 10000; } printf("Total Count is %d\n", total_count); shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT); if (shmid == -1) { perror("Shared memory"); return 1; } // Attach to the segment to get a pointer to it. shmp = shmat(shmid, NULL, 0); if (shmp == (void *) -1) { perror("Shared memory attach"); return 1; } shmp->cntr = 0; pid = fork(); /* Parent Process - Writing Once */ if (pid > 0) { shared_memory_cntr_increment(pid, shmp, total_count); } else if (pid == 0) { shared_memory_cntr_increment(pid, shmp, total_count); return 0; } else { perror("Fork Failure\n"); return 1; } while (shmp->read_complete != 1) sleep(1); if (shmdt(shmp) == -1) { perror("shmdt"); return 1; } if (shmctl(shmid, IPC_RMID, 0) == -1) { perror("shmctl"); return 1; } printf("Writing Process: Complete\n"); return 0; } /* Increment the counter of shared memory by total_count in steps of 1 */ void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count) { int cntr; int numtimes; int sleep_time; cntr = shmp->cntr; shmp->write_complete = 0; if (pid == 0) printf("SHM_WRITE: CHILD: Now writing\n"); else if (pid > 0) printf("SHM_WRITE: PARENT: Now writing\n"); //printf("SHM_CNTR is %d\n", shmp->cntr); /* Increment the counter in shared memory by total_count in steps of 1 */ for (numtimes = 0; numtimes < total_count; numtimes++) { cntr += 1; shmp->cntr = cntr; /* Sleeping for a second for every thousand */ sleep_time = cntr % 1000; if (sleep_time == 0) sleep(1); } shmp->write_complete = 1; if (pid == 0) printf("SHM_WRITE: CHILD: Writing Done\n"); else if (pid > 0) printf("SHM_WRITE: PARENT: Writing Done\n"); return; }
编译和执行步骤
Total Count is 10000 SHM_WRITE: PARENT: Now writing SHM_WRITE: CHILD: Now writing SHM_WRITE: PARENT: Writing Done SHM_WRITE: CHILD: Writing Done Writing Process: Complete
现在,让我们检查共享内存读取程序。
/* Filename: shm_read_cntr.c */ #include<stdio.h> #include<sys/ipc.h> #include<sys/shm.h> #include<sys/types.h> #include<string.h> #include<errno.h> #include<stdlib.h> #include<unistd.h> #define SHM_KEY 0x12345 struct shmseg { int cntr; int write_complete; int read_complete; }; int main(int argc, char *argv[]) { int shmid, numtimes; struct shmseg *shmp; int total_count; int cntr; int sleep_time; if (argc != 2) total_count = 10000; else { total_count = atoi(argv[1]); if (total_count < 10000) total_count = 10000; } shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT); if (shmid == -1) { perror("Shared memory"); return 1; } // Attach to the segment to get a pointer to it. shmp = shmat(shmid, NULL, 0); if (shmp == (void *) -1) { perror("Shared memory attach"); return 1; } /* Read the shared memory cntr and print it on standard output */ while (shmp->write_complete != 1) { if (shmp->cntr == -1) { perror("read"); return 1; } sleep(3); } printf("Reading Process: Shared Memory: Counter is %d\n", shmp->cntr); printf("Reading Process: Reading Done, Detaching Shared Memory\n"); shmp->read_complete = 1; if (shmdt(shmp) == -1) { perror("shmdt"); return 1; } printf("Reading Process: Complete\n"); return 0; }
编译和执行步骤
Reading Process: Shared Memory: Counter is 11000 Reading Process: Reading Done, Detaching Shared Memory Reading Process: Complete
如果观察上面的输出,计数器应该是 20000,但是,由于在一个进程任务完成之前,另一个进程也在并行处理,因此计数器值并非预期值。输出会因系统而异,并且每次执行都会有所不同。为了确保两个进程在完成一个任务后执行任务,应该使用同步机制来实现。
现在,让我们使用信号量检查相同的应用程序。
注意 - 读取程序保持不变。
/* Filename: shm_write_cntr_with_sem.c */ #include<stdio.h> #include<sys/types.h> #include<sys/ipc.h> #include<sys/shm.h> #include<sys/sem.h> #include<string.h> #include<errno.h> #include<stdlib.h> #include<unistd.h> #include<string.h> #define SHM_KEY 0x12345 #define SEM_KEY 0x54321 #define MAX_TRIES 20 struct shmseg { int cntr; int write_complete; int read_complete; }; void shared_memory_cntr_increment(int, struct shmseg*, int); void remove_semaphore(); int main(int argc, char *argv[]) { int shmid; struct shmseg *shmp; char *bufptr; int total_count; int sleep_time; pid_t pid; if (argc != 2) total_count = 10000; else { total_count = atoi(argv[1]); if (total_count < 10000) total_count = 10000; } printf("Total Count is %d\n", total_count); shmid = shmget(SHM_KEY, sizeof(struct shmseg), 0644|IPC_CREAT); if (shmid == -1) { perror("Shared memory"); return 1; } // Attach to the segment to get a pointer to it. shmp = shmat(shmid, NULL, 0); if (shmp == (void *) -1) { perror("Shared memory attach: "); return 1; } shmp->cntr = 0; pid = fork(); /* Parent Process - Writing Once */ if (pid > 0) { shared_memory_cntr_increment(pid, shmp, total_count); } else if (pid == 0) { shared_memory_cntr_increment(pid, shmp, total_count); return 0; } else { perror("Fork Failure\n"); return 1; } while (shmp->read_complete != 1) sleep(1); if (shmdt(shmp) == -1) { perror("shmdt"); return 1; } if (shmctl(shmid, IPC_RMID, 0) == -1) { perror("shmctl"); return 1; } printf("Writing Process: Complete\n"); remove_semaphore(); return 0; } /* Increment the counter of shared memory by total_count in steps of 1 */ void shared_memory_cntr_increment(int pid, struct shmseg *shmp, int total_count) { int cntr; int numtimes; int sleep_time; int semid; struct sembuf sem_buf; struct semid_ds buf; int tries; int retval; semid = semget(SEM_KEY, 1, IPC_CREAT | IPC_EXCL | 0666); //printf("errno is %d and semid is %d\n", errno, semid); /* Got the semaphore */ if (semid >= 0) { printf("First Process\n"); sem_buf.sem_op = 1; sem_buf.sem_flg = 0; sem_buf.sem_num = 0; retval = semop(semid, &sem_buf, 1); if (retval == -1) { perror("Semaphore Operation: "); return; } } else if (errno == EEXIST) { // Already other process got it int ready = 0; printf("Second Process\n"); semid = semget(SEM_KEY, 1, 0); if (semid < 0) { perror("Semaphore GET: "); return; } /* Waiting for the resource */ sem_buf.sem_num = 0; sem_buf.sem_op = 0; sem_buf.sem_flg = SEM_UNDO; retval = semop(semid, &sem_buf, 1); if (retval == -1) { perror("Semaphore Locked: "); return; } } sem_buf.sem_num = 0; sem_buf.sem_op = -1; /* Allocating the resources */ sem_buf.sem_flg = SEM_UNDO; retval = semop(semid, &sem_buf, 1); if (retval == -1) { perror("Semaphore Locked: "); return; } cntr = shmp->cntr; shmp->write_complete = 0; if (pid == 0) printf("SHM_WRITE: CHILD: Now writing\n"); else if (pid > 0) printf("SHM_WRITE: PARENT: Now writing\n"); //printf("SHM_CNTR is %d\n", shmp->cntr); /* Increment the counter in shared memory by total_count in steps of 1 */ for (numtimes = 0; numtimes < total_count; numtimes++) { cntr += 1; shmp->cntr = cntr; /* Sleeping for a second for every thousand */ sleep_time = cntr % 1000; if (sleep_time == 0) sleep(1); } shmp->write_complete = 1; sem_buf.sem_op = 1; /* Releasing the resource */ retval = semop(semid, &sem_buf, 1); if (retval == -1) { perror("Semaphore Locked\n"); return; } if (pid == 0) printf("SHM_WRITE: CHILD: Writing Done\n"); else if (pid > 0) printf("SHM_WRITE: PARENT: Writing Done\n"); return; } void remove_semaphore() { int semid; int retval; semid = semget(SEM_KEY, 1, 0); if (semid < 0) { perror("Remove Semaphore: Semaphore GET: "); return; } retval = semctl(semid, 0, IPC_RMID); if (retval == -1) { perror("Remove Semaphore: Semaphore CTL: "); return; } return; }
编译和执行步骤
Total Count is 10000 First Process SHM_WRITE: PARENT: Now writing Second Process SHM_WRITE: PARENT: Writing Done SHM_WRITE: CHILD: Now writing SHM_WRITE: CHILD: Writing Done Writing Process: Complete
现在,我们将通过读取进程检查计数器值。
执行步骤
Reading Process: Shared Memory: Counter is 20000 Reading Process: Reading Done, Detaching Shared Memory Reading Process: Complete