【IPC】Posix消息队列 太过爱你忘了你带给我的痛 2022-08-21 02:27 172阅读 0赞 消息队列是一种IPC机制,有Posix消息队列和System V消息队列两种类型,它们有许多相似之处,但也有一些差别:对Posix消息队列的读总是返回优先级最高的最早的消息,而对System V消息队列的读则可以返回任意指定优先级的消息;当往空队列放置消息时,Posix消息队列允许产生一个信号或启动一个线程,而System V消息队列则不能提供类似的机制。消息队列可认为是一个消息链表,队列中的每个消息都有几个共同的属性:一个无符号整数优先级(Posix)或一个长整数类型(System V)、消息的数据部分长度(可以为0)、数据本身(如果长度大于0)。Posix消息队列的使用有一些注意事项,可在shell终端通过”man 7 mq\_overview”查看。 ## 1、消息队列的创建与关闭 ## 创建消息队列使用mq\_open函数: #include <mqueue.h> mqd_t mq_open(const char *name, int oflag); mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr); struct mq_attr { long mq_flags; /* Flags: 0 or O_NONBLOCK */ long mq_maxmsg; /* Max. # of messages on queue */ long mq_msgsize; /* Max. message size (bytes) */ long mq_curmsgs; /* # of messages currently in queue */ }; mq\_open成功时返回一个消息队列描述符,类型为mqd\_t,失败时返回-1并设置相应的errno。参数name、oflag、mode的用法和常见的open函数的几个参数的用法相同,attr为消息队列的属性,当创建一个新的消息队列时要指定mode和attr参数,attr可以是NULL,这时消息队列的属性为默认值。参数name的格式比较特殊,它是个C风格的字符串且以斜线”/”开始但后面的字符不能包括斜线,长度不能超过NAME\_MAX即255。mq\_open有一个限制MQ\_OPEN\_MAX,一个进程同时拥有的打开的消息队列描述符不能超过这个值。 关闭打开的消息队列使用mq\_close函数: #include <mqueue.h> int mq_close(mqd_t mqdes); int mq_unlink(const char *name); mq\_close成功时返回0,失败时返回-1并设置相应的errno,功能同普通的close函数关闭一个不用的文件描述符一样,但它并不从系统中删除,从系统中删除要使用mq\_unlink函数。mq\_unlink成功时返回0,失败时返回-1并设置相应的errno。 消息队列内部维护着一个打开的消息队列描述符的引用计数,mq\_close和mq\_unlink执行成功后,都会使引用计数减1,当引用计数为0时,消息队列才会被真正的清除,但对于mq\_unlink则不同,即使引用计数不为0,消息队列在系统中的name也会被删除,只是消息队列并没有被真正的清除,直到最后一个mq\_close将引用计数减为0。由于Posix消息队列具有随内核的持续性,如果不调用mq\_unlink,消息队列在系统关闭时才会消失。 消息队列在编译链接时用到了librt库,使用-lrt选项。 消息队列的位置—— 在Linux上初次使用mq\_open创建消息队列时,在本地文件系统很可能找不到消息队列的文件位置,原因是消息队列在虚拟系统中创建,以root权限使用如下mount命令后就可以在”/dev/mqueue”下找到了。 $ mkdir /dev/mqueue $ mount -t mquque none /dev/mqueue 下面是创建与关闭消息队列的几个例子: // mqcreate1.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <mqueue.h> #include <sys/types.h> #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) int main(int argc, char **argv) { int c, flags; mqd_t mqd; flags = O_RDWR | O_CREAT; while (-1 != (c = getopt(argc, argv, "e"))) { switch (c) { case 'e': flags |= O_EXCL; break; } } if (optind != argc - 1) { printf("usage: %s [-e] <name>\n", argv[0]); exit(EXIT_FAILURE); } mqd = mq_open(argv[optind], flags, FILE_MODE, NULL); if (-1 == mqd) { printf("mq_open error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (-1 == mq_close(mqd)) { printf("mq_close error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } // mq_unlink(argv[optind]); exit(EXIT_SUCCESS); } // mqcreate.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <mqueue.h> #include <sys/types.h> #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) // mq_maxmsg and mq_msgsize both init to 0 struct mq_attr attr; int main(int argc, char **argv) { int c, flags; mqd_t mqd; flags = O_RDWR | O_CREAT; while (-1 != (c = getopt(argc, argv, "em:z:"))) { switch (c) { case 'e': flags |= O_EXCL; break; case 'm': attr.mq_maxmsg = atol(optarg); break; case 'z': attr.mq_msgsize = atol(optarg); break; } } if (optind != argc - 1) { printf("usage: %s [-e] [-m maxmsg -z msgsize] <name>\n", argv[0]); exit(EXIT_FAILURE); } if ((attr.mq_maxmsg != 0 && attr.mq_msgsize == 0) || (attr.mq_maxmsg == 0 && attr.mq_msgsize != 0)) { printf("must specify both -m maxmsg and -z msgsize\n"); exit(EXIT_FAILURE); } mqd = mq_open(argv[optind], flags, FILE_MODE, (attr.mq_maxmsg != 0) ? &attr : NULL); if (-1 == mqd) { printf("mq_open error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (-1 == mq_close(mqd)) { printf("mq_close error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } // mq_unlink(argv[optind]); exit(EXIT_SUCCESS); } // mqunlink.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <mqueue.h> #include <sys/types.h> int main(int argc, char **argv) { if (argc != 2) { printf("usage: %s <name>\n", argv[0]); exit(EXIT_FAILURE); } if (-1 == mq_unlink(argv[1])) { printf("mq_unlink error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } exit(EXIT_SUCCESS); } // mqsysconf.c #include <stdio.h> #include <stdlib.h> #include <unistd.h> int main(int argc, char **argv) { printf("MQ_OPEN_MAX = %ld, MQ_PRIO_MAX = %ld\n", sysconf(_SC_MQ_OPEN_MAX), sysconf(_SC_MQ_PRIO_MAX)); exit(0); } ## 2、消息队列的属性 ## 获取与设置消息队列的属性,有如下两个函数: #include <mqueue.h> int mq_getattr(mqd_t mqdes, struct mq_attr *attr); int mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr); struct mq_attr { long mq_flags; /* Flags: 0 or O_NONBLOCK */ long mq_maxmsg; /* Max. # of messages on queue */ long mq_msgsize; /* Max. message size (bytes) */ long mq_curmsgs; /* # of messages currently in queue */ }; mq\_getatt与mq\_setattr成功时返回0,失败时返回-1并设置相应的errno。mq\_getatt用于获取消息队列的属性。mq\_getatt用于设置消息队列的属性,但是只能设置mq\_flags,其它三个属性将被忽略,mq\_maxmsg和mq\_msgsize由mq\_open创建时设置,mq\_curmsgs只能获取而不能设置。mq\_attr结构指针可作为mq\_open的参数,在创建新的消息队列时指定mq\_maxmsg和mq\_msgsize,但其它两个属性将被mq\_open忽略。 关于消息队列结构成员的几个值可在下列目录查看(它们的值分别为10、10、8192、8192、256): /proc/sys/fs/mqueue/msg\_default /proc/sys/fs/mqueue/msg\_max /proc/sys/fs/mqueue/msgsize\_default /proc/sys/fs/mqueue/msgsize\_max /proc/sys/fs/mqueue/queues\_max 下面是消息队列属性相关例子: // mqgetattr.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <mqueue.h> #include <sys/types.h> int main(int argc, char **argv) { mqd_t mqd; struct mq_attr attr; if (argc != 2) { printf("usage: %s <name>\n", argv[0]); exit(EXIT_FAILURE); } if (-1 == (mqd = mq_open(argv[1], O_RDONLY))) { printf("mq_open error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (-1 == mq_getattr(mqd, &attr)) { printf("mq_getattr error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } printf("maxmsg = %ld, msgsize = %ld, curmsgs = %ld\n", attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs); if (-1 == mq_close(mqd)) { printf("mq_close error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } exit(EXIT_SUCCESS); } ## 3、消息的发送与获取 ## 消息的发送与获取使用下面两个函数: #include <mqueue.h> int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio); mq\_send成功时返回0,失败时返回-1并设置errno,它往消息队列中放置一个消息,每个消息有一个优先级,它是一个小于MQ\_PRIO\_MAX的无符号整数。mq\_receive成功时返回消息长度,失败时返回-1并设置errno,它从消息队列中取走一个消息,但不能设置优先级(这点不同于System V消息队列的msgrcv),总是返回指定消息队列的最高优先级的最早消息,这个优先级可随消息的内容和长度一并返回。mq\_receive函数的msg\_len参数需要保证能容纳消息的最大长度,即消息队列属性的msgsize,否则发生错误EMSGSIZE。 下面是消息队列的收发消息的例子: // mqsend.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <mqueue.h> #include <sys/types.h> int main(int argc, char **argv) { mqd_t mqd; void *ptr; size_t len; unsigned prio; if (4 != argc) { printf("usage: %s <name> <bytes> <priority>\n", argv[0]); exit(EXIT_FAILURE); } len = atoi(argv[2]); prio = atoi(argv[3]); mqd = mq_open(argv[1], O_WRONLY); if (-1 == mqd) { printf("mq_open error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } ptr = calloc(len, sizeof(char)); if (NULL == ptr) { printf("calloc error\n"); exit(EXIT_FAILURE); } if (-1 == mq_send(mqd, ptr, len, prio)) { printf("mq_send error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } exit(EXIT_SUCCESS); } // mqreceive.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <mqueue.h> #include <sys/types.h> int main(int argc, char **argv) { int c, flags; mqd_t mqd; ssize_t n; unsigned prio; void *buff; struct mq_attr attr; flags = O_RDWR; while (-1 != (c = getopt(argc, argv, "n"))) { switch (c) { case 'n': flags |= O_NONBLOCK; break; } } if (optind != argc - 1) { printf("usage: %s [-n] <name>\n", argv[0]); exit(EXIT_FAILURE); } mqd = mq_open(argv[optind], flags); if (-1 == mqd) { printf("mq_open error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (-1 == mq_getattr(mqd, &attr)) { printf("mq_getattr error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } buff = malloc(attr.mq_msgsize); if (NULL == buff) { printf("malloc error\n"); exit(EXIT_FAILURE); } if (-1 == (n = mq_receive(mqd, buff, attr.mq_msgsize, &prio))) { printf("mq_receive error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } printf("read %ld bytes, priority = %u\n", (long)n, prio); exit(EXIT_SUCCESS); } ## 4、消息通知 ## 在使用消息队列时会遇到这么一种情况,从空队列获取消息时,如果设置为阻塞,那么将一直阻塞在mq\_receive上,期间不能做其它事情,如果为非阻塞, mq\_receive将马上出错返回,为了能够获取后续发送来的消息,要循环去获取消息或者叫做轮询poll,这又会浪费CPU时间,也是不值当的,一个可行的方法是使用消息队列的异步事件通知mq\_notify,它可以告诉我们何时有一个消息放到了空的消息队列。 #include <mqueue.h> int mq_notify(mqd_t mqdes, const struct sigevent *sevp); mq\_notify成功时返回0,失败时返回-1并设置errno。mq\_notify通知有两种形式,产生一个信号,或者创建一个线程并执行指定的函数,使用时有如下一般规则。 如果sevp参数非空,那么当前进程希望在有个消息到达所指定队列而且该队列先前为空时得到通知,该进程被注册为接收该队列的通知。如果sevp参数为空指针,而且当前进程被注册为接收所指定队列的通知,那么现有注册将被撤销。任意时刻只有一个进程可以被注册为接收某个给定队列的通知。当有一个消息到达某个先前为空的队列,而且已有一个进程被注册为接收该消息队列的通知时,只有任何线程没有阻塞在该队列mq\_receive调用的前提下,通知才会发出,在mq\_recieve调用中的阻塞比任何注册的通知都优先。当该通知被发送给注册它的进程时,其注册即被撤销,如果需要的话,该进程必须再次调用mq\_notify以重新注册。 下面是消息队列的几个例子: // mqnotifysig1.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <mqueue.h> #include <sys/types.h> #include <pthread.h> #include <signal.h> #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH) mqd_t mqd; void *buff; struct mq_attr attr; struct sigevent sigev; static void sig_usr1(int); int main(int argc, char **argv) { if (2 != argc) { printf("usage: %s <name>\n", argv[0]); exit(EXIT_FAILURE); } // open queue, get attributes, allocate read buffer if (-1 == (mqd = mq_open(argv[1], O_RDONLY))) { printf("mq_open error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (-1 == mq_getattr(mqd, &attr)) { printf("mq_getattr error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (NULL == (buff = malloc(attr.mq_msgsize))) { printf("malloc error\n"); exit(EXIT_FAILURE); } // establish signal handler, enable notifation if (SIG_ERR == signal(SIGUSR1, sig_usr1)) { printf("signal error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; if (-1 == mq_notify(mqd, &sigev)) { printf("mq_notify error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } for (;;) { pause(); // signal handler does everything } exit(EXIT_SUCCESS); } // wrong with mq_notify/mq_receive/printf // not async-signal-safe functions called from sig_urr1 static sig_usr1(int signo) { ssize_t n; if (-1 == mq_notify(mqd, &sigev)) { // register first printf("mq_notify error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } n = mq_receive(mqd, buff, attr.mq_msgsize, NULL); printf("SIGUSR1 received, read %ld bytes\n", (long)n); return; } // mqnotifysig2.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <mqueue.h> #include <sys/types.h> #include <pthread.h> #include <signal.h> // set nonzero by signal handler volatile sig_atomic_t mqflag; static void sig_usr1(int); // problem: two msgs arrived, one maybe ignored int main(int argc, char **argv) { mqd_t mqd; void *buff; ssize_t n; sigset_t zeromask, newmask, oldmask; struct mq_attr attr; struct sigevent sigev; if (2 != argc) { printf("usage: %s <name>\n", argv[0]); exit(EXIT_FAILURE); } // open queue, get attributes, allocate read buffer if (-1 == (mqd = mq_open(argv[1], O_RDONLY))) { printf("mq_open error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (-1 == mq_getattr(mqd, &attr)) { printf("mq_getattr error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (NULL == (buff = malloc(attr.mq_msgsize))) { printf("malloc error\n"); exit(EXIT_FAILURE); } sigemptyset(&zeromask); // no signals blocked sigemptyset(&newmask); sigemptyset(&oldmask); sigaddset(&newmask, SIGUSR1); // establish signal handler, enable notifation if (SIG_ERR == signal(SIGUSR1, sig_usr1)) { printf("signal error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; if (-1 == mq_notify(mqd, &sigev)) { printf("mq_notify error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } for (;;) { sigprocmask(SIG_BLOCK, &newmask, &oldmask); // block SIGUSR1 while (mqflag == 0) { sigsuspend(&zeromask); } mqflag = 0; // reset flag if (-1 == mq_notify(mqd, &sigev)) { // register first printf("mq_notify error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } n = mq_receive(mqd, buff, attr.mq_msgsize, NULL); printf("SIGUSR1 received, read %ld bytes\n", (long)n); sigprocmask(SIG_UNBLOCK, &newmask, NULL); // unblock SIGUSR1 } exit(EXIT_SUCCESS); } // avoid function called from signal handler // just set a global mark checked by some thread static sig_usr1(int signo) { mqflag = 1; return; } // mqnotifysig3.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <mqueue.h> #include <sys/types.h> #include <pthread.h> #include <signal.h> // set nonzero by signal handler volatile sig_atomic_t mqflag; static void sig_usr1(int); // read msg with nonblock while mq_notify int main(int argc, char **argv) { mqd_t mqd; void *buff; ssize_t n; sigset_t zeromask, newmask, oldmask; struct mq_attr attr; struct sigevent sigev; if (2 != argc) { printf("usage: %s <name>\n", argv[0]); exit(EXIT_FAILURE); } // open queue, get attributes, allocate read buffer if (-1 == (mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK))) { printf("mq_open error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (-1 == mq_getattr(mqd, &attr)) { printf("mq_getattr error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (NULL == (buff = malloc(attr.mq_msgsize))) { printf("malloc error\n"); exit(EXIT_FAILURE); } sigemptyset(&zeromask); // no signals blocked sigemptyset(&newmask); sigemptyset(&oldmask); sigaddset(&newmask, SIGUSR1); // establish signal handler, enable notifation if (SIG_ERR == signal(SIGUSR1, sig_usr1)) { printf("signal error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; if (-1 == mq_notify(mqd, &sigev)) { printf("mq_notify error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } for (;;) { sigprocmask(SIG_BLOCK, &newmask, &oldmask); // block SIGUSR1 while (mqflag == 0) { sigsuspend(&zeromask); } mqflag = 0; // reset flag if (-1 == mq_notify(mqd, &sigev)) { // register first printf("mq_notify error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } while (0 <= (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL))) { printf("SIGUSR1 received, read %ld bytes\n", (long)n); } if (EAGAIN == errno) { printf("mq_receive error"); // no msg for reading } sigprocmask(SIG_UNBLOCK, &newmask, NULL); // unblock SIGUSR1 } exit(EXIT_SUCCESS); } // avoid function called from signal handler // just set a global mark checked by some thread static sig_usr1(int signo) { mqflag = 1; return; } // mqnotifysig4.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <mqueue.h> #include <sys/types.h> #include <pthread.h> #include <signal.h> volatile sig_atomic_t mqflag; // using sigwait // sigwait for multi-thread usually // replaces sigprocmask with pthread_sigmask if multi-thread // sigwaitinfo // sigtimedwait // sigwait better than async-signal-handler int main(int argc, char **argv) { int signo; mqd_t mqd; void *buff; ssize_t n; sigset_t newmask; struct mq_attr attr; struct sigevent sigev; if (2 != argc) { printf("usage: %s <name>\n", argv[0]); exit(EXIT_FAILURE); } // open queue, get attributes, allocate read buffer if (-1 == (mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK))) { printf("mq_open error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (-1 == mq_getattr(mqd, &attr)) { printf("mq_getattr error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (NULL == (buff = malloc(attr.mq_msgsize))) { printf("malloc error\n"); exit(EXIT_FAILURE); } sigemptyset(&newmask); sigaddset(&newmask, SIGUSR1); sigprocmask(SIG_BLOCK, &newmask, NULL); // block SIGUSR1 // establish signal handler, enable notifation sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; if (-1 == mq_notify(mqd, &sigev)) { printf("mq_notify error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } for (;;) { if (0 !=sigwait(&newmask, &signo)) { printf("sigwait error\n"); } if (signo == SIGUSR1) { if (-1 == mq_notify(mqd, &sigev)) { // register first printf("mq_notify error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } while (0 <= (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL))) { printf("SIGUSR1 received, read %ld bytes\n", (long)n); } if (EAGAIN == errno) { printf("mq_receive error"); // no msg for reading } } } exit(EXIT_SUCCESS); } // avoid function called from signal handler // just set a global mark checked by some thread static sig_usr1(int signo) { mqflag = 1; return; } // mqnotifysig5.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <mqueue.h> #include <sys/types.h> #include <sys/select.h> #include <pthread.h> #include <signal.h> int pipefd[2]; static void sig_usr1(int); // using pipe and select int main(int argc, char **argv) { int nfds; char c; fd_set rset; mqd_t mqd; void *buff; ssize_t n; struct mq_attr attr; struct sigevent sigev; if (2 != argc) { printf("usage: %s <name>\n", argv[0]); exit(EXIT_FAILURE); } // open queue, get attributes, allocate read buffer if (-1 == (mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK))) { printf("mq_open error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (-1 == mq_getattr(mqd, &attr)) { printf("mq_getattr error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (NULL == (buff = malloc(attr.mq_msgsize))) { printf("malloc error\n"); exit(EXIT_FAILURE); } if (0 > pipe(pipefd)) { printf("pipe error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } // establish signal handler, enable notifation if (SIG_ERR == signal(SIGUSR1, sig_usr1)) { printf("signal error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; if (-1 == mq_notify(mqd, &sigev)) { printf("mq_notify error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } for (;;) { FD_SET(pepefd[0], &rset); nfds = select(pipefd[0] + 1, &ret, NULL, NULL, NULL); // ? if (-1 == nfds) { printf("select error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (FD_ISSET(pipefd[0], &rset)) { read(pipefd[0], &c, 1); if (-1 == mq_notify(mqd, &sigev)) { // register first printf("mq_notify error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } while (0 <= (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL))) { printf("SIGUSR1 received, read %ld bytes\n", (long)n); } if (EAGAIN == errno) { printf("mq_receive error"); // no msg for reading } } } exit(EXIT_SUCCESS); } static sig_usr1(int signo) { write(pipefd[1], "", 1); // one byte of 0 return; } // mqnotifysig6.c #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <mqueue.h> #include <sys/types.h> #include <sys/select.h> #include <pthread.h> #include <signal.h> mqd_t mqd; struct mq_attr attr; struct sigevent sigev; static void notify_thread(union sigval); // our thread function // using pthread int main(int argc, char **argv) { if (2 != argc) { printf("usage: %s <name>\n", argv[0]); exit(EXIT_FAILURE); } if (-1 == (mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK))) { printf("mq_open error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } if (-1 == mq_getattr(mqd, &attr)) { printf("mq_getattr error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } sigev.sigev_notify = SIGEV_THREAD; sigev.sigev_value.sival_ptr = NULL; sigev.sigev_notify_function = notify_thread; sigev.sigev_notify_attributes = NULL; if (-1 == mq_notify(mqd, &sigev)) { printf("mq_notify error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } for (;;) { pause(); // each new thread does everything } exit(EXIT_SUCCESS); } static void notify_thread(union sigval arg) { ssize_t n; void *buff; printf("notify_thread started\n"); buff = malloc(attr.mq_msgsize); if (-1 == mq_notify(mqd, &sigev)) { // register printf("mq_notify error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } while (0 <= (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL))) { printf("read %ld bytes\n", (long)n); } if (EAGAIN == errno) { printf("mq_receive error"); } free(buff); pthread_exit(NULL); } ## 5、实时信号 ## 最初的信号模型是不可靠的,信号可能丢失,而且进程难以在执行临界代码段时关掉选中的若干信号,后来逐步提供了可靠信号、实时信号。信号可划分为两个大组,\[SIGRTMIN, SIGRTMAX\]实时信号和SIGALARM/SIGINT/SIGKILL等等其它信号。实时信号意味着信号是排队的,先进先出,同一信号产生多少次就递交多少次,信号解阻塞时实时信号值较小的优先递交,而且实时信号通过SA\_SIGINFO标记可以比其它信号传递更多的信息。 下面例子使用了实时信号。 #include <stdio.h> #include <string.h> #include <errno.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <signal.h> typedef void sigfunc_rt(int, siginfo_t*, void*); static void sig_rt(int, siginfo_t*, void*); sigfunc_rt* signal_rt(int, sigfunc_rt*); int main(int argc, char **argv) { int i, j; pid_t pid; sigset_t newset; union sigval val; printf("SIGRTMIN = %d, SIGRTMAX = %d\n", (int)SIGRTMIN, (int)SIGRTMAX); if (0 == (pid = fork())) { // child: block three realtime signals if (0 != sigemptyset(&newset)) { printf("sigemptyset error\n"); } if (0 != sigaddset(&newset, SIGRTMAX)) { printf("sigaddset error: SIGRTMAX\n"); } if (0 != sigaddset(&newset, SIGRTMAX - 1)) { printf("sigaddset error: SIGRTMAX - 1\n"); } if (0 != sigaddset(&newset, SIGRTMAX - 2)) { printf("sigaddset error: SIGRTMAX - 2\n"); } if (0 != sigprocmask(SIG_BLOCK, &newset, NULL)) { printf("sigprocmask error: %s\n", strerror(errno)); } // establish signal handler with SA_SIGINFO set signal_rt(SIGRTMAX, sig_rt); signal_rt(SIGRTMAX - 1, sig_rt); signal_rt(SIGRTMAX - 2, sig_rt); sleep(6); // let parent send all the signals if (0 != sigprocmask(SIG_UNBLOCK, &newset, NULL)) { // unblock printf("sigprocmask error: %s\n", strerror(errno)); } sleep(3); // let all queued signals be delivered exit(0); } // parent sends nine signals to child sleep(3); // let child block all signals for (i = SIGRTMAX; i >= SIGRTMAX - 2; i--) { for (j = 0; j <= 2; j++) { val.sival_int = j; if (0 != sigqueue(pid, i, val)) { printf("sigqueue error: %s\n", strerror(errno)); } printf("sent signal %d, val = %d\n", i, j); } } exit(0); } static void sig_rt(int signo, siginfo_t *info, void *context) { printf("received signal %d, code = %d, ival = %d\n", signo, info->si_code, info->si_value.sival_int); } sigfunc_rt* signal_rt(int signo, sigfunc_rt *func) { struct sigaction act, oact; act.sa_sigaction = func; // must store function addr here sigemptyset(&act.sa_mask); act.sa_flags = SA_SIGINFO; if (signo == SIGALRM) { #ifdef SA_INTERRUPT act.sa_flags |= SA_INTERRUPT; // SunOS 4.x #endif } else { #ifdef SA_RESTART act.sa_flags |= SA_RESTART; // SVR4, 44BSD #endif } if (sigaction(signo, &act, &oact) < 0) { return (sigfunc_rt*)SIG_ERR; } return oact.sa_sigaction; }
相关 消息队列 :场景 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0... 蔚落/ 2024年04月18日 23:53/ 0 赞/ 84 阅读
相关 消息队列 一、消息队列MQ(Message Queue): 1)消息队列是一种先进先出的数据结构; 2)消息队列使用的“协议”不是具体的通讯协议,而是更高层次通讯模型。它定义 「爱情、让人受尽委屈。」/ 2023年10月11日 11:11/ 0 赞/ 55 阅读
相关 消息队列? 对于 MQ 来说,其实不管是 RocketMQ、Kafka 还是其他消息队列,它们的本质都是:一发一存一消费。下面我们以这个本质作为根,一起由浅入深地聊聊 MQ。 01 从 Bertha 。/ 2023年10月10日 12:49/ 0 赞/ 33 阅读
相关 消息队列 一、什么是消息队列 以下为虚构的小故事: 有一天,产品跑来跟小王说:“我们要做一个用户实名的功能,需要在用户实名成功后给用户发一条短信。” 小王(攻城狮leve 缺乏、安全感/ 2023年10月06日 17:00/ 0 赞/ 57 阅读
相关 消息队列 1. 消息队列在项目中的使用 背景:在分布式系统中是如何处理高并发的。 由于在高并发的环境下,来不及同步处理用户发送的请求,则会导致请求发生阻塞。比如说,大量的ins ﹏ヽ暗。殇╰゛Y/ 2022年12月15日 03:23/ 0 赞/ 278 阅读
相关 消息队列 https://www.cnblogs.com/457248499-qq-com/p/7392678.html 来源 消息队列:在消息的传输过程中保存消息的容器。 迈不过友情╰/ 2022年05月30日 07:42/ 0 赞/ 293 阅读
相关 消息队列 消息队列是啥?我觉得大家都心知肚明,已经众所周知到不用解释的程度。不过,但凡学习、解释一样东西,都应该遵循 “它是什么?”、 “做什么用?”、 “为啥要用它” ╰半橙微兮°/ 2022年04月23日 08:42/ 0 赞/ 364 阅读
相关 消息队列 消息队列介绍 维基百科上的描述:在计算机科学中,消息队列(Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通 红太狼/ 2022年01月26日 10:51/ 0 赞/ 356 阅读
相关 消息队列 为什么写这篇文章? 博主有两位朋友分别是小A和小B: 1. 小A,工作于传统软件行业(某社保局的软件外包公司),每天工作内容就是和产品聊聊需求,改改业务逻辑。再不然就 小咪咪/ 2021年12月20日 06:51/ 0 赞/ 446 阅读
还没有评论,来说两句吧...