首页 > 学院 > 开发设计 > 正文

APUE读书笔记--POSIX消息队列

2019-11-06 06:48:57
字体:
来源:转载
供稿:网友

APUE读书笔记–POSIX消息队列

1. 概述

消息队列可以认为是一个消息链表,有足够的写权限的线程可以往队列中放置消息,有足够的读权限的线程可以从队列中取走消息。每个消息都是一个记录(record),有一个长度和优先级。消息队列具有随内核的持续性

随内核持续性:ipC对象一直存在到内核重新自举或显式删除该对象为止。

2. mq_open、mq_close、mq_unlink函数

2.1 mq_open函数

mq_open函数创建一个新的消息队列或打开一个已存在的消息队列。

#include <fcntl.h> /* For O_* constants */#include <sys/stat.h> /* For mode constants */#include <mqueue.h>mqd_t mq_open(const char *name, int oflag);mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);Link with -lrt.//返回值:若成功,则为消息队列描述符,若出错返回-1

返回值称为消息队列描述符,不必是向文件描述符或套接字这样的短整形。

linux内核将mqd_t 定义为 int类型: typedef __kernel_mqd_t mqd_t; typedef int __kernel_mqd_t;

name命名规则

必须符合已有的路径名规则(必须最多由PATH_MAX个字节构成,包括结尾的空字节)如果它以斜杠开头,那么对这些函数的不同调用将访问同一个队列。名字中额外的斜杠符的解释由实现定义。为了可移植性,IPC名字必须以一个斜杠开头,并且不能包含其他的斜杠符。oflag参数必须指定或O_WRONLY或O_RDONLY或O_RDWR标志,还可以指定O_CREAT、O_EXCL、O_TRUNC标志。如果指定O_CREAT标志,则需要指定mode和attr值,attr值为NULL则使用默认属性。

2.2 mq_close函数

mq_close函数关闭已打开的消息队列

#include <mqueue.h>int mq_close(mqd_t mqdes);//返回值:若成功则为0,若出错则为-1调用进程可以不在使用该描述符,但是其消息队列并不从系统中删除。一个进程终止时,它的所有打开着的消息队列都关闭。这与close函数类似。

2.3 mq_unlink函数

要从系统中删除用作mq_open第一个参数的name,必须调用mq_unlink

#include <mqueue.h>int mq_unlink(const char *name);//返回值:若成功,返回0,若出错则为-1当一个消息队列的引用计数仍大于0时,其name就能删除。与unlink函数删除一个文件的机制类似。

2.4 函数例子

创建一个消息队列

#include "unpipc.h"int main(int argc, char *argv[]){ int c, flags; mqd_t mqd; flags = O_CREAT | O_RDWR; while((c = getopt(argc, argv, "e")) != -1) { //getopt函数读主函数参数,从第一个开始读,如果有-e选项,则flags|= O_EXCL switch (c) { case 'e': flags |= O_EXCL; break; } } if(optind != argc - 1) sys_err("usage: ./a.out [ -e ] <name>"); mqd = mq_open(argv[optind], flags, FILE_MODE, NULL); if(mqd == -1) { sys_err("mq_open err"); } PRintf("mqd = %d/n", mqd); mq_close(mqd); return 0;}

删除一个消息队列

#include "unpipc.h"int main(int argc, char *argv[]){ if(argc != 2) sys_err("usage: mqunlink <name>"); if(mq_unlink(argv[1]) == -1) sys_err("mq_unlink err"); printf("删除成功/n"); return 0;}

运行结果:

➜ MQ ./mqcreatel /test.mqmqd = 3➜ MQ ./mqcreatel /test.mq -emq_open err: File exists➜ MQ ls -l /dev/mqueue 总用量 0-rw-r--r-- 1 menwen menwen 80 3月 5 18:01 test.mq➜ MQ ./mqunlink /test.mq 删除成功

3. mq_getattr和mq_setattr函数

每个消息队列有四个属性,mq_getattr返回这些属性,mq_setattr则设置其中某些属性。

#include <mqueue.h>int mq_getattr(mqd_t mqdes, struct mq_attr *attr);int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);//返回值:若成功,返回0,若出错则为-1

mq_attr含有以下属性:

struct mq_attr { long mq_flags; /* Flags: 0 or O_NONBLOCK */ long mq_maxmsg; /* Max. # of messages on queue */ long mq_msgsize; /* Max. message size (bytes) */ long mq_curmsgs; /* # of messages currently in queue */};mq_getattr把所指定队列的当前属性填入到attr指向的结构。mq_setattr给所指定的队列设置属性,但是只能使用attr指向的mq_attr结构的mq_flags成员,以设置或清除非阻塞标志。另外三个成员被忽略:每个队列的最大消息数和每个消息的最大字节数只能在创建队列时设置,队列中的当前消息数只能获取而不能设置。oattr指针非空,则会保存先前的消息队列属性。

3.1 消息队列的默认属性

#include "unpipc.h"int main(int argc, char *argv[]){ mqd_t mqd; struct mq_attr attr; if(argc != 2) sys_err("usage: mqgetattr <name>"); mqd = mq_open(argv[1], O_RDWR | O_CREAT, 0777, NULL); mq_getattr(mqd, &attr); printf("maxmsg #msgs = %ld/nmsgsize #bytes/msg= %ld/ncurmsg = %ld/n", attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs); mq_close(mqd); return 0;}

运行结果:

➜ MQ_POSIX ./mqgetattr /test.mqmaxmsg #msgs = 10msgsize #bytes/msg= 8192curmsg = 0

3.2 修改消息队列属性

#include "unpipc.h"struct mq_attr attr; //attr成员被初始化为0int main(int argc, char *argv[]){ mqd_t mqd; int flags, c; flags = O_CREAT | O_RDWR; while( (c = getopt(argc, argv, "em:z:")) != -1) { switch (c) { case 'e': flags |= O_EXCL; break; case 'm': attr.mq_maxmsg = atol(optarg); break; case 'z': attr.mq_msgsize = atol(optarg); break; } } if(optind != argc-1) sys_err("usage:mqcreat [-e] [-m maxmsg -z msgsize] <name>"); if((attr.mq_maxmsg != 0 && attr.mq_msgsize == 0) || (attr.mq_maxmsg == 0 && attr.mq_msgsize != 0)) sys_err("must specify both -m maxmsg and -z msgsize"); mqd = mq_open(argv[optind], flags, FILE_MODE, &attr); if(mqd == -1) mq_close(mqd); return 0;}

运行结果:

➜ MQ_POSIX ./mqcreate -e -m 5 -z 8192 /testmqd = 3➜ MQ_POSIX ls -l /dev/mqueue/test-rw-r--r-- 1 menwen menwen 80 3月 5 20:52 /dev/mqueue/test

mq_open错误提示Invalid argument问题总结

4. mq_send和mq_receive函数

这两个函数分别用于往一个队列中放置一个消息和从一个队列中取走一个消息,每个消息有一个优先级,它是小于MQ_PRIO_MAX的无符号整数,POSIX要求其上限至少为32。

通过man mq_overview查看

On Linux, sysconf(_SC_MQ_PRIO_MAX) returns 32768, but POSIX.1 requires only that an implementation support at least priorities in the range 0 to 31; some implementations provide only this range 在Linux系统中, sysconf(_SC_MQ_PRIO_MAX)返回值为32768,但是POSIX.1要求优先级至少支持0-31的范围,一些实现值提供这个范围.

mq_receive总是返回优先级最高的最早消息,而且该优先级能随该消息的内容及长度一同返回。

#include <mqueue.h>int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);//返回值:若成功,返回0,若出错返回-1ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);//返回值:若成功,返回消息中字节数,若出错,返回-1这两个函数的前三个参数类似write和read的前三个参数。mq_receive的len参数必须不能小于加到所指定队列中的消息的最大大小(mq_attr的mq_msgsize成员),否则函数返回EMSGSIZE。mq_send的prio参数是待发送消息的优先级,其值必须小于MQ_PROI_MAX。非空的msg_prio指针可以存放优先级。

4.1 函数例子

mqsend程序#include "unpipc.h"int main(int argc, char *argv[]){ mqd_t mqd; int len, prio; void *ptr; if(argc != 4) sys_err("usage: mqsend <name> <#byte> <priority>"); len = atoi(argv[2]); prio = atoi(argv[3]); mqd = mq_open(argv[1], O_RDWR | O_CREAT, FILE_MODE, NULL); ptr = calloc(len, sizeof(char)); mq_send(mqd, ptr, len, prio); return 0;}mqreceive程序#include "unpipc.h"int main(int argc, char *argv[]){ int flags, c; ssize_t n; unsigned int prio; mqd_t mqd; struct mq_attr attr; void * buf; flags = O_RDONLY; while((c = getopt(argc, argv, "n")) != -1) { switch (c) { case 'n': flags |= O_NONBLOCK; break; } } if(optind != argc -1) sys_err("usage: mqreceive [-n] <name>"); mqd = mq_open(argv[optind], flags); mq_getattr(mqd, &attr); buf = malloc(attr.mq_msgsize); n = mq_receive(mqd, buf,attr.mq_msgsize, &prio); if(n == -1) sys_err("mq_receive err"); printf("read %ld bytes/npriority = %u/n", (long)n, prio); return 0;}

运行结果:

➜ MQ_POSIX ./mqsend /test.mq 100 18 //100字节,18优先级➜ MQ_POSIX ./mqsend /test.mq 17 30 //17字节,30优先级➜ MQ_POSIX ./mqsend /test.mq 43 5 //43字节,5优先级➜ MQ_POSIX ./mqsend /test.mq 27 27 //27字节,27优先级➜ MQ_POSIX ./mqreceive /test.mqread 17 bytespriority = 30➜ MQ_POSIX ./mqreceive /test.mqread 27 bytespriority = 27➜ MQ_POSIX ./mqreceive /test.mqread 100 bytespriority = 18➜ MQ_POSIX ./mqreceive /test.mqread 43 bytespriority = 5➜ MQ_POSIX ./mqreceive -n /test.mqmq_receive err: Resource temporarily unavailable//-n参数非阻塞属性可以看出,mq_receive返回优先级最高的最早消息

5. 消息队列的限制

在建立消息队列时的两个限制: mq_maxmsg 队列中的最大消息数mq_msgsize 给定消息的最大字节数

Linux系统对两者都有限制,查看/proc/sys/fs/mqueue/中的文件

msg_default 10msg_max 10msgsize_default 8192msgsize_max 8192queues_max 256消息队列的实现定义了另外两个限制 MQ_OPEN_MAX :一个进程能够同时打开者消息队列的最大数目。(POSIX要求至少为8)MQ_PRIO_MAX :任意消息的最大优先级加1(POSIX要求至少为32,Linux中是0-32767)

5.1 例子:

#include "unpipc.h"int main(void){ printf("MQ_OPEN_MAX = %ld/nMQ_PRIO_MAX = %ld/n", sysconf(_SC_MQ_OPEN_MAX), sysconf(_SC_MQ_PRIO_MAX)); return 0;}

运行结果:

MQ_OPEN_MAX = -1 //-1代表无限制MQ_PRIO_MAX = 32768

6. mq_notify函数

POSIX消息队列允许异步事件通知,以告知何时有一个消息放置到某个控队列中,这种通知有两种方式可供选择:

产生一个信号创建一个线程来执行一个指定的函数

函数mq_notify为指定队列建立或删除异步事件通知。

#include <mqueue.h>int mq_notify(mqd_t mqdes, const struct sigevent *sevp);//返回值:若成功则为0,若出错则为-1

sigevent函数结构如下:

typedef struct sigevent { sigval_t sigev_value; int sigev_signo; int sigev_notify; union { int _pad[SIGEV_PAD_SIZE]; int _tid; struct { void (*_function)(sigval_t); void *_attribute; /* really pthread_attr_t */ } _sigev_thread; } _sigev_un;} sigevent_t;typedef union sigval { int sival_int; void *sival_ptr;} sigval_t;#define sigev_notify_function _sigev_un._sigev_thread._function#define sigev_notify_attributes _sigev_un._sigev_thread._attribute#define sigev_notify_thread_id _sigev_un._tid

使用于该函数的若干规则

如果sevp参数非空,那么当前进程希望有一个消息到达所指定的先前为空的队列时得到通知。说“该进程被注册为接收该队列的通知”如何sevp参数为空指针,而且当前进程目前被注册为接收所指定队列的通知,那么已存在的注册将被撤销。任意时刻只有一个进程可以被注册为接收某个给定队列的通知。当有一个消息到达某个先前为空的队列,而且已有一个进程被注册为接受该队列的通知时,只有在没有任何线程阻塞在该队列的mq_reveive调用中的前提下,通知才会发出,这就是说,在mq_reveive调用中的阻塞比任何通知的注册都优先。当该通知被发送给它的注册进程时,其注册即被撤销,如果需要,该进程必须再次调用mq_notify以重新注册。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表