进程间通信之消息队列
这两天在复习Linux应用编程,感谢杨宗德老师的书《Linux高级程序设计》,写得非常通俗易懂又不乏严谨,一路看下去非常顺利,即便第三版的书中有些小的编辑错误,但不影响阅读。我这两天的文章的内容大部分来自杨老师的书本,整理了一下。
IPC:进程间通信(Inter Process Communication),指多个进程之间进行数据交换。
system V提供的IPC机制主要有消息队列,信号量和共享内存3种。使用IPC前必须先创建,每种IPC都有特定的生产者、所有者和访问权限。使用ipcs命令可以查看当前系统正在使用的IPC工具。一个IPC工具至少包含key值、ID值、拥有者、权限和使用的大小等关键信息。如果需要手工删除某个IPC机制,可以使用ipcrm命令。
ftok()函数创建key值:(key值为一个32位的整型数据)
exiern key_t ftok (__const char * __pathname,int __prok_id);
第一个参数 pathname:为文件路径,或目录,一般是当前目录。
第二个参数 id:为一个整形变量,是子序号,参与构成ftok()函数的返回值。虽然是int类型,但是只使用8bits(1-255)。
在ftok()函数创建key值过程中使用了该文件属性的st_dev 和st_ino。
key值构成:
key值的第31-24(共8位)为ftok()第二个参数的低8位。//第二个参数的用处在这里。
key值的第23-16(共8位)为该文件的st_dev属性的第8位。
key值的第15-0为该文件的st_ino属性的低16位。
因此,如果使用相同的文件路径及整数(第二个参数),得到的key值是唯一的,唯一的key值创建某类IPC机制时将得到同一个IPC机制。(但如果使用相同的key值分别创建一个消息队列和一个信号量,两者没有联系)而文件路径的访问对两个进程来说很容易统一,因此便捷的实现了两个进程通信机制ID的确定。
拥有者及权限:
要访问一个IPC工具需要对该工具拥有相应的权限,任何一个IPC对象,都和普通文件一样,具有创建者、创建者组、拥有者和拥有者组。这些属性在IPC工具的struct ipc_perm结构体中定义。
/* Data structure used to pass permission information to IPC operations. */struct ipc_perm{ __key_t __key; /* Key. */ __uid_t uid; /* Owner's user ID. */ __gid_t gid; /* Owner's group ID. */ __uid_t cuid; /* Creator's user ID. */ __gid_t cgid; /* Creator's group ID. */ unsigned short int mode; /* Read/write permission. */ unsigned short int __pad1; unsigned short int __seq; /* Sequence number. */ unsigned short int __pad2; unsigned long int __unused1; unsigned long int __unused2;};
消息队列:
消息队列组织图:
整个消息队列有两种类型的数据结构:
1、msqid_ds消息队列数据结构:描述整个消息队列的属性,主要包括整个消息队列的权限,拥有者、两个重要的指针分别指向消息队列的第一个消息和最后一个消息。
2、msg消息数据结构:整个消息队列的主体,一个消息队列有若干个消息,每个消息数据结构的基本成员包括消息类型、消息大小、消息内容指针和下一个消息数据结构位置。
/usr/include/linux/msg.h中定义了最大消息队列个数,消息最大值,默认消息队列大小这些系统数据。
消息队列的基本属性:
整个消息队列的基本属性由msqid_ds数据结构在文件/usr/include/msg.h中定义:
struct msqid_ds{struct ipc_perm msg_perm;//权限struct msg *msg_first;//指向消息头struct msg *msg_last;//指向消息尾__kernel_tiem_t msg_stime;//last msgsnd time 最近发送消息时间__kernel_tiem_t msg_rtime;//lsat msgrcv time 最近接受消息时间__kernel_tiem_t msg_ctime;//last change timeunsigned long msg_lcbytes;//Reuse junk fields for 32 bitunsigned long msg_lqbytes;//dittounsigned short msg_qnum;//current number of bytes on queue 当前队列大小unsigned short msg_qbytes;//max number of bytes on queue 队列最大值__kernel_ipc_pid_t msg_lspid;//最近msgsnd 的pid__kernel_ipc_pid_t msg_lrpid;//最近receive 的pid};
struct msg消息结构体:
struct msg_msg{struct list_head m_list;long m_type;//消息类型int m_ts;//消息大小struct msg_msgseg *next;//下一个消息位置void *security;//the actual message follows immediately 真正消息位置}
消息队列管理:
创建消息队列:
在使用一个消息对列前,需要使用msgget()函数创建该消息队列,其函数声明如下:
extern int msgget(key_t __key,int _msgflg);
第一个参数 key:为由ftok创建的key值,
第二个参数 msgflg:用来确定消息队列的访问权限。
消息队列属性控制:
创建消息队列后,可以对该消息队列的基本属性进行控制(修改),控制消息队列属性的函数为msgctl():
extern int msgctl (int __msgid,int __cmd,struct msqid_ds *buf);
第一个参数 msqid:为消息队列标识符,该值为使用msgget()函数创建消息队列的返回值。
第二个参数 cmd:为执行的控制命令,即要执行的操作。包括以下选项:
#define IPC_RMID 0 //remove
#define IPC_SET 1 //set ipc_perm options
#define IPC_STAT 2 //get ipc_perm options
#define IPC_INFO 3 //see ipcs
IPC_STAT:读取消息队列属性,取得此队列的msqid_ds 结构,并将其存放在buf指向的结构中。
IPC_SET:设置消息队列属性。按由buf指向的结构中的值,设置与此队列相关的结构中的下列4个字段:msg_perm.uid,msg_perm.gid,msg_perm,mode和msg_qbytes。此命令只能由下列两种进程执行:一种是其有效用户ID等于msg_perm.cuid 或msg_perm.uid的进程,另一种 是具有超级用户特权的进程。只有超级用户才能增加msg_qbytes的值。
IPC_RMID:删除消息队列,从系统中删除该消息队列以及人在该队列上的所有数据,这种删除立即生效。仍在使用者一消息队列的其他进程在他们下一次试图对此队列进行操作时,将出错返回EIDRM。此命令只能由下 列两种进程执行:一种是其有效用户ID等于msg_perm.cuid 或msg_perm.uid的进程,另一种是具有超级用户特权的进程。
IPC_INFO:读取消息队列基本情况。
这四条选项(IPC_STAT、IPC_SET、IPC_INFO和IPC_RMID)也可用于信号量和共享内存。
第三个参数 buf:是一个临时的msqid_ds 结构体类型的变量。用于存储读取的消息队列属性或者需要修改的消息队列属性。
发送信息到消息队列:
extern int msgnsd (int __msqid,__const void *__msgp,size_t __msgsz,int __msgflg);
第一个参 msqid:数由msgget()生成的消息队列标识符,即将消息添加到那个消息队列中。
第二个参数 msgp:指向用户定义的缓冲区msgbuf结构:
struct msgbuf{
long mtype; //消息类型
char mtext[1]; //消息的内容,在使用时自己重新定义此结构。
};
mtype是一个正整数,表示消息的类型,因此,接收进程可用来进行消息选择(消息队列在存储信息时是按发送的先后顺序放置的)。
第三个参数 msgsz:为接收信息的大小。
第四个参数 msgflg:用来指定在达到系统为消息队列所定的界限时应采取的操作。
成功调用后,此函数将返回0,否则返回-1,同时将对消息队列msqid以下成员执行下列操作:
msg_qnum:以 1 为增量递增。
smg_lspid:设置为调用进程的进程ID。
msg_stime:设置为当前时间。
从消息队列中接收消息:
extern int msgrcv(int __msqid,void *__msgp,size_t __msgsz,long int __msgtyp,int __msgflg);
此函数从msgid 指定的消息队列读取消息,并将其放置到由msgp指向的内存空间中。
第一个参数 msqid:为读的对象,即从哪个消息队列获取的消息。
第二个参数 msgp:为一个临时的消息数据结构,用来保存读取的消息。
struct msgbuf {
long mtype; //保存读取的消息类型
char mtext; //存储消息位置,需要重新定义
}
第三个参数 msgze:用于指定mtext的大小,如果收到的消息大于msgsz,并且msgflg&MSG_NOERROR为真,则将g该消息截至msgsz字节,截去部分丢失不提示。
第四个参数 msgtyp:用于指定提取什么类型的消息。
=0:接收队列中的第一条消息,任意类型。
>0: 接收第一条指定为msgtyp类型的消息。
<0:接收第一条最低类型(小于或等于msgtyp的绝对值)的消息。
第五个参数 msgflg:用于指定所需类型的消息不再队列上时将要采取的操作。如果设置IPC_NOWAIT,如果现在没有消息,调用进程立即返回,同时返回-1,并将errno设置为ENOMSG。如果未设置IPC_NOWAIT,则阻塞调用进程,直至出现以下任何一种情况发生:
a)某一所需类型的消息被放置到队列中;
b)msqid从系统中删除,当该情况发生时,将errno设置为I、EIDRM,并返回-1;
c)调用进程收到一个要捕获的信号,在这种情况下,未收到消息,并且调用进程按signal(SIGTRAP)中指定方式恢复执行。
消息接收成功完成后,该消息将自动从消息队列中删除,并返回接收到的消息大小,并将对整个消息队列msqid数据数据结构的成员执行下列操作:
msg_qnum:以 1 为减量递减。
msg_lrpid:设置为调用进程的进程ID。
msg_rtiem:设置为当前时间。
消息队列应用实例:
消息队列实现本机进程间双向通信:
这个例子实现两个程序(进程)通信,在两个终端分别运行这两个程序,可在任一终端输入信息发送,在另一终端会自动接收信息,实现实时聊天。这里用到了两个不同类型的消息,进程A仅添加类型为1的消息到消息队列,进程B仅从消息队列取类型为1的消息,这样就实现了进程A发送信息给进程B。同样,进程B仅添加类型为2的消息到消息队列,进程A仅从消息队列取类型为2的消息,这样就实现了进程B发送信息给进程A。
msg_receiver_sender_a.c:
#include<stdio.h>#include<stdlib.h>#include<sys/ipc.h>#include<sys/msg.h>#include<string.h>struct msgbuf{int type;char ptr[0];};int main (int argc,char *argv[]){key_t key;key = ftok(argv[1],100);int msgid;msgid = msgget(key,IPC_CREAT | 0600);pid_t pid;pid = fork();if (pid == 0){while(1){printf("pls input msg to send:");char buf[128];fgets(buf,128,stdin);struct msgbuf *ptr = malloc(sizeof(struct msgbuf) + strlen(buf) + 1);ptr->type = 1;memcpy(ptr->ptr,buf,strlen(buf) + 1);msgsnd(msgid,ptr,strlen(buf) + 1,0);free(ptr);}}else{struct msgbuf{int type;char ptr[1024];};while(1){struct msgbuf mybuf;memset(&mybuf,'\0',sizeof(mybuf));msgrcv(msgid,&mybuf,1024,2,0);printf("\nrecv msg:%s\n",mybuf.ptr);}}}
msg_receiver_sender_b.c:
#include<stdio.h>#include<stdlib.h>#include<sys/ipc.h>#include<sys/msg.h>#include<string.h>struct msgbuf{int type;char ptr[0];};int main (int argc,char *argv[]){key_t key;key = ftok(argv[1],100);int msgid;msgid = msgget (key,IPC_CREAT | 0600);pid_t pid;pid = fork();if (pid == 0){while (1){printf("pls input msg to send:");char buf[128];fgets(buf,128,stdin);struct msgbuf *ptr = malloc(sizeof(struct msgbuf) + strlen(buf) + 1);ptr->type = 2;memcpy(ptr->ptr,buf,strlen(buf) + 1);msgsnd(msgid,ptr,strlen(buf) + 1,0);free(ptr);}}else{struct msgbuf{int type;char ptr[1024];};while(1){struct msgbuf mybuf;memset(&mybuf,'\0',sizeof(mybuf));msgrcv(msgid,&mybuf,1024,1,0);printf("\nrecv msg:%s\n",mybuf.ptr);}}}