消息队列



当我们已经拥有共享内存时,为什么还需要消息队列?这可能有几个原因,让我们尝试将其分解成多个要点以简化理解:

  • 如我们所知,一旦一个进程接收了消息,该消息将不再可供任何其他进程使用。而在共享内存中,多个进程都可以访问数据。

  • 如果我们希望使用小型消息格式进行通信。

  • 当多个进程同时通信时,需要使用同步机制来保护共享内存数据。

  • 如果使用共享内存进行读写操作的频率很高,那么实现该功能将非常复杂。在这种情况下,使用它并不划算。

  • 如果并非所有进程都需要访问共享内存,而只有少数几个进程需要访问,那么使用消息队列来实现会更好。

  • 如果我们希望使用不同的数据包进行通信,例如进程 A 向进程 B 发送消息类型 1,向进程 C 发送消息类型 10,向进程 D 发送消息类型 20。在这种情况下,使用消息队列实现更简单。为了简化给定的消息类型(如 1、10、20),它可以是 0 或正数或负数,如下所述。

  • 当然,消息队列的顺序是 FIFO(先进先出)。队列中插入的第一条消息将是第一个被检索到的消息。

使用共享内存或消息队列取决于应用程序的需求以及如何有效地利用它们。

使用消息队列进行通信可以以下列方式进行:

  • 一个进程写入共享内存,另一个进程从共享内存读取。我们知道,多个进程也可以进行读取。

Message Queue
  • 一个进程使用不同的数据包写入共享内存,多个进程从中读取,即根据消息类型读取。

Multiple Message Queue

在了解了有关消息队列的某些信息后,现在是时候检查支持消息队列的系统调用 (System V) 了。

要使用消息队列进行通信,需要执行以下步骤:

步骤 1 - 创建消息队列或连接到已存在的队列 (msgget())

步骤 2 - 写入消息队列 (msgsnd())

步骤 3 - 从消息队列读取 (msgrcv())

步骤 4 - 对消息队列执行控制操作 (msgctl())

现在,让我们检查以上调用的语法和一些信息。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg)

此系统调用创建或分配一个 System V 消息队列。需要传递以下参数:

  • 第一个参数 key 用于识别消息队列。key 可以是任意值,也可以是从库函数 ftok() 派生的值。

  • 第二个参数 shmflg 指定所需的消息队列标志,例如 IPC_CREAT(如果不存在则创建消息队列)或 IPC_EXCL(与 IPC_CREAT 一起使用以创建消息队列,如果消息队列已存在则调用失败)。还需要传递权限。

注意 - 有关权限的详细信息,请参考前面的章节。

如果成功,此调用将返回一个有效的消息队列标识符(用于后续的消息队列调用),如果失败则返回 -1。要了解失败的原因,请检查 errno 变量或 perror() 函数。

与此调用相关的各种错误包括 EACCESS(权限被拒绝)、EEXIST(队列已存在,无法创建)、ENOENT(队列不存在)、ENOMEM(内存不足以创建队列)等。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg)

此系统调用将消息发送/追加到消息队列 (System V)。需要传递以下参数:

  • 第一个参数 msgid 用于识别消息队列,即消息队列标识符。标识符值在 msgget() 成功后接收。

  • 第二个参数 msgp 是指向发送到调用者的消息的指针,以以下形式定义的结构形式定义:

struct msgbuf {
   long mtype;
   char mtext[1];
};

变量 mtype 用于与不同的消息类型进行通信,在 msgrcv() 调用中详细解释。变量 mtext 是一个数组或其他结构,其大小由 msgsz(正值)指定。如果未提及 mtext 字段,则将其视为零大小消息,这是允许的。

  • 第三个参数 msgsz 是消息的大小(消息应以空字符结尾)。

  • 第四个参数 msgflg 指示某些标志,例如 IPC_NOWAIT(在队列中找不到消息时立即返回)或 MSG_NOERROR(如果超过 msgsz 字节则截断消息文本)。

如果成功,此调用将返回 0,如果失败则返回 -1。要了解失败的原因,请检查 errno 变量或 perror() 函数。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgrcv(int msgid, const void *msgp, size_t msgsz, long msgtype, int msgflg)

此系统调用从消息队列 (System V) 中检索消息。需要传递以下参数:

  • 第一个参数 msgid 用于识别消息队列,即消息队列标识符。标识符值在 msgget() 成功后接收。

  • 第二个参数 msgp 是从调用者接收的消息的指针。它以以下形式定义的结构形式定义:

struct msgbuf {
   long mtype;
   char mtext[1];
};

变量 mtype 用于与不同的消息类型进行通信。变量 mtext 是一个数组或其他结构,其大小由 msgsz(正值)指定。如果未提及 mtext 字段,则将其视为零大小消息,这是允许的。

  • 第三个参数 msgsz 是接收的消息的大小(消息应以空字符结尾)。

  • 第四个参数 msgtype 指示消息的类型:

    • 如果 msgtype 为 0 - 读取队列中接收到的第一个消息。

    • 如果 msgtype 为正数 - 读取队列中类型为 msgtype 的第一个消息(如果 msgtype 为 10,则只读取类型为 10 的第一个消息,即使在队列开头可能存在其他类型)。

    • 如果 msgtype 为负数 - 读取类型小于或等于消息类型绝对值的第一个消息(例如,如果 msgtype 为 -5,则读取类型小于 5 的第一个消息,即类型为 1 到 5 的消息)。

  • 第五个参数 msgflg 指示某些标志,例如 IPC_NOWAIT(在队列中找不到消息时立即返回)或 MSG_NOERROR(如果超过 msgsz 字节则截断消息文本)。

如果成功,此调用将返回实际接收到的 mtext 数组中的字节数,如果失败则返回 -1。要了解失败的原因,请检查 errno 变量或 perror() 函数。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msgid, int cmd, struct msqid_ds *buf)

此系统调用执行消息队列 (System V) 的控制操作。需要传递以下参数:

  • 第一个参数 msgid 用于识别消息队列,即消息队列标识符。标识符值在 msgget() 成功后接收。

  • 第二个参数 cmd 是要对消息队列执行所需控制操作的命令。cmd 的有效值为:

IPC_STAT - 将 struct msqid_ds 中每个成员的当前值的副本复制到 buf 指向的传递的结构中。此命令需要对消息队列具有读取权限。

IPC_SET - 设置所有者的用户 ID、组 ID、权限等,这些内容由 buf 指向的结构指定。

IPC_RMID - 立即删除消息队列。

IPC_INFO - 返回有关消息队列限制和参数的信息,这些信息存储在 buf 指向的结构中,该结构的类型为 struct msginfo。

MSG_INFO - 返回一个 msginfo 结构,其中包含有关消息队列消耗的系统资源的信息。

  • 第三个参数 buf 是指向名为 struct msqid_ds 的消息队列结构的指针。此结构的值将根据 cmd 用于设置或获取。

此调用将根据传递的命令返回值。IPC_INFO 和 MSG_INFO 或 MSG_STAT 的成功返回消息队列的索引或标识符,其他操作返回 0,如果失败则返回 -1。要了解失败的原因,请检查 errno 变量或 perror() 函数。

在了解了有关消息队列的基本信息和系统调用后,现在是时候检查一下程序了。

在查看程序之前,让我们先了解一下描述:

步骤 1 - 创建两个进程,一个用于发送到消息队列 (msgq_send.c),另一个用于从消息队列中检索 (msgq_recv.c)。

步骤 2 - 使用 ftok() 函数创建密钥。为此,首先创建文件 msgq.txt 以获取唯一的密钥。

步骤 3 - 发送进程执行以下操作。

  • 从用户读取字符串输入。

  • 如果存在,则删除换行符。

  • 发送到消息队列。

  • 重复此过程,直到输入结束 (CTRL + D)。

  • 接收到输入结束信号后,发送消息“end”以表示进程结束。

步骤 4 - 在接收进程中,执行以下操作。

  • 从队列读取消息。
  • 显示输出。
  • 如果接收到的消息为“end”,则完成进程并退出。

为了简化起见,在此示例中我们不使用消息类型。此外,一个进程写入队列,另一个进程从队列读取。这可以根据需要扩展,即理想情况下,一个进程写入队列,多个进程从队列读取。

现在,让我们检查一下进程(将消息发送到队列) - 文件:msgq_send.c

/* Filename: msgq_send.c */
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int len;
   key_t key;
   system("touch msgq.txt");
   
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   
   if ((msqid = msgget(key, PERMS | IPC_CREAT)) == -1) {
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to send messages.\n");
   printf("Enter lines of text, ^D to quit:\n");
   buf.mtype = 1; /* we don't really care in this case */
   
   while(fgets(buf.mtext, sizeof buf.mtext, stdin) != NULL) {
      len = strlen(buf.mtext);
      /* remove newline at end, if it exists */
      if (buf.mtext[len-1] == '\n') buf.mtext[len-1] = '\0';
      if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
      perror("msgsnd");
   }
   strcpy(buf.mtext, "end");
   len = strlen(buf.mtext);
   if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
   perror("msgsnd");
   
   if (msgctl(msqid, IPC_RMID, NULL) == -1) {
      perror("msgctl");
      exit(1);
   }
   printf("message queue: done sending messages.\n");
   return 0;
}

编译和执行步骤

message queue: ready to send messages.
Enter lines of text, ^D to quit:
this is line 1
this is line 2
message queue: done sending messages.

以下是来自消息接收进程(从队列中检索消息)的代码 - 文件:msgq_recv.c

/* Filename: msgq_recv.c */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int toend;
   key_t key;
   
   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }
   
   if ((msqid = msgget(key, PERMS)) == -1) { /* connect to the queue */
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to receive messages.\n");
   
   for(;;) { /* normally receiving never ends but just to make conclusion 
             /* this program ends wuth string of end */
      if (msgrcv(msqid, &buf, sizeof(buf.mtext), 0, 0) == -1) {
         perror("msgrcv");
         exit(1);
      }
      printf("recvd: \"%s\"\n", buf.mtext);
      toend = strcmp(buf.mtext,"end");
      if (toend == 0)
      break;
   }
   printf("message queue: done receiving messages.\n");
   system("rm msgq.txt");
   return 0;
}

编译和执行步骤

message queue: ready to receive messages.
recvd: "this is line 1"
recvd: "this is line 2"
recvd: "end"
message queue: done receiving messages.
广告