Cobalt/POSIX 消息队列服务。
消息队列允许实时线程之间交换数据。对于 POSIX 消息队列,最大消息长度和最大消息数量在使用 mq_open()
创建时被固定。
int mq_timedsend(mqd_t mqdes, const char *msg_ptr,
size_t msg_len, unsigned int msg_prio,
const struct timespec *abs_timeout);
在限定时间内尝试向消息队列发送消息。
该服务等同于 mq_send()
,不同之处在于如果消息队列已满且描述符 fd
未设置 O_NONBLOCK
标志,则调用线程仅在指定的 abs_timeout
超时时间到期前被挂起。
参数:
mqdes
: 消息队列描述符;msg_ptr
: 指向要发送的消息的指针;msg_len
: 消息的长度;msg_prio
: 消息的优先级;abs_timeout
: 超时时间,以 CLOCK_REALTIME
时钟的绝对值表示。返回值:
errno
:
EBADF
,fd
不是一个有效的用于写入的消息队列描述符;EMSGSIZE
,消息长度超过消息队列的 mq_msgsize
属性;EAGAIN
,描述符 fd
设置了 O_NONBLOCK
标志且消息队列已满;EPERM
,调用者上下文无效;ETIMEDOUT
,指定的超时时间已到期;EINTR
,服务被信号中断。标签:
xthread-only
,switch-primary
示例代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <time.h>
#include <errno.h>
int main() {
mqd_t mq;
struct mq_attr attr;
char *message = "Hello, World!";
struct timespec timeout;
// 设置消息队列的属性
attr.mq_flags = 0;
attr.mq_maxmsg = 10;
attr.mq_msgsize = 128;
attr.mq_curmsgs = 0;
// 创建或打开消息队列
mq = mq_open("/my_queue", O_CREAT | O_WRONLY, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
// 设置超时时间,例如5秒
timeout.tv_sec = 5;
timeout.tv_nsec = 0;
// 发送消息,设置超时时间
if (mq_timedsend(mq, message, strlen(message) + 1, 1, &timeout) == -1) {
if (errno == ETIMEDOUT) {
perror("Message send timed out");
} else {
perror("mq_timedsend");
}
mq_close(mq);
exit(1);
}
printf("Message sent\n");
// 关闭消息队列
mq_close(mq);
return 0;
}
ssize_t mq_timedreceive(mqd_t q, char *buffer,
size_t len, unsigned int *prio,
const struct timespec *timeout);
在限定时间内尝试从消息队列接收消息。
该服务等同于 mq_receive()
,不同之处在于如果描述符 fd
未设置 O_NONBLOCK
标志且消息队列为空,则调用线程仅在指定的 abs_timeout
超时时间到期前被挂起。
参数:
q
: 队列描述符;buffer
: 成功时用于存储接收消息的地址;len
: 缓冲区长度;prio
: 成功时用于存储接收消息优先级的地址;timeout
: 超时时间,以 CLOCK_REALTIME
时钟的绝对值表示。返回值:
buffer
地址;errno
:
EBADF
,fd
不是一个有效的用于读取的描述符;EMSGSIZE
,长度 len
小于消息队列的 mq_msgsize
属性;EAGAIN
,队列为空,且描述符 fd
设置了 O_NONBLOCK
标志;EPERM
,调用者上下文无效;EINTR
,服务被信号中断;ETIMEDOUT
,指定的超时时间已到期。标签:
xthread-only
,switch-primary
示例代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <time.h>
#include <errno.h>
int main() {
mqd_t mq;
struct mq_attr attr;
char buffer[128];
unsigned int priority;
struct timespec timeout;
unsigned int max_cycles = 10;
// 设置消息队列的属性
attr.mq_flags = 0; // 消息队列标志
attr.mq_maxmsg = 10; // 最大消息数
attr.mq_msgsize = 128; // 最大消息大小
attr.mq_curmsgs = 0; // 当前消息数
// 创建或打开消息队列
mq = mq_open("/my_queue", O_CREAT | O_RDONLY, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
// 设置超时时间,例如5秒
timeout.tv_sec = 5;
timeout.tv_nsec = 0;
for (int i = 0; i < max_cycles; i++) {
// 接收消息,设置超时时间
ssize_t bytes_received = mq_timedreceive(mq, buffer, sizeof(buffer), &priority, &timeout);
if (bytes_received == -1) {
if (errno == ETIMEDOUT) {
perror("Message receive timed out");
} else {
perror("mq_timedreceive");
mq_close(mq);
exit(1);
}
}
// 打印接收到的消息
buffer[bytes_received] = '\0'; // 确保字符串以NULL结尾
printf("Received message[%d]: %s\n", priority, buffer);
}
// 关闭消息队列
mq_close(mq);
return 0;
}
int mq_close(mqd_t mqd)
关闭消息队列。
该服务关闭消息队列描述符 mqd
。消息队列仅在所有打开的描述符都关闭并通过调用 mq_unlink()
服务取消链接后才会被销毁。
参数:
mqd
: 消息队列描述符。返回值:
0
;-1
并设置 errno
:
EBADF
,mqd
是无效的消息队列描述符;EPERM
,调用者上下文无效。标签:
thread-unrestricted
,switch-secondary
示例代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <time.h>
#include <errno.h>
int main() {
mqd_t mq;
struct mq_attr attr;
char *message = "Hello, World!";
struct timespec timeout;
// 设置消息队列的属性
attr.mq_flags = 0; // 消息队列标志
attr.mq_maxmsg = 10; // 最大消息数
attr.mq_msgsize = 128; // 最大消息大小
attr.mq_curmsgs = 0; // 当前消息数
// 创建或打开消息队列
mq = mq_open("/my_queue", O_CREAT | O_WRONLY, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
// 设置超时时间,例如5秒
timeout.tv_sec = 5;
timeout.tv_nsec = 0;
// 发送消息,设置超时时间
if (mq_timedsend(mq, message, strlen(message) + 1, 1, &timeout) == -1) {
if (errno == ETIMEDOUT) {
perror("Message send timed out");
} else {
perror("mq_timedsend");
}
mq_close(mq);
exit(1);
}
printf("Message sent\n");
// 关闭消息队列
mq_close(mq);
return 0;
}
int mq_getattr(mqd_t mqd, struct mq_attr *attr)
获取消息队列属性。
该服务将消息队列描述符 mqd
的属性存储在 attr
指定的地址。
以下属性将被设置:
mq_flags
: 消息队列描述符 mqd
的标志;mq_maxmsg
: 消息队列中的最大消息数;mq_msgsize
: 最大消息大小;mq_curmsgs
: 当前队列中的消息数。参数:
mqd
: 消息队列描述符;attr
: 成功时用于存储消息队列属性的地址。返回值:
0
;-1
并设置 errno
:
EBADF
,mqd
不是一个有效的描述符。标签:
thread-unrestricted
示例代码
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>
int main() {
mqd_t mq;
struct mq_attr attr, newattr;
// 设置消息队列的属性
attr.mq_flags = 0; // 消息队列标志
attr.mq_maxmsg = 10; // 最大消息数
attr.mq_msgsize = 128; // 最大消息大小
attr.mq_curmsgs = 0; // 当前消息数
// 打开消息队列,如果不存在则创建
mq = mq_open("/my_queue", O_CREAT | O_RDWR, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
// 获取当前的消息队列属性
if (mq_getattr(mq, &newattr) == -1) {
perror("mq_getattr");
mq_close(mq);
exit(1);
}
// 打印当前属性
printf("Current message queue attributes:\n");
printf(" mq_flags: %d\n", attr.mq_flags);
printf(" mq_maxmsg: %lu\n", attr.mq_maxmsg);
printf(" mq_msgsize: %lu\n", attr.mq_msgsize);
printf(" mq_curmsgs: %lu\n", attr.mq_curmsgs);
// 关闭消息队列
if (mq_close(mq) == -1) {
perror("mq_close");
exit(1);
}
// 删除消息队列
if (mq_unlink("/my_queue") == -1) {
perror("mq_unlink");
exit(1);
}
return 0;
}
int mq_notify(mqd_t mqd, const struct sigevent *evp)
启用消息到达通知。
如果 evp
不是 NULL
且是一个 sigevent
结构体的地址,并且其 sigev_notify
成员设置为 SIGEV_SIGNAL
,当消息发送到消息队列 mqd
时,如果队列为空且没有线程阻塞在 mq_receive()
或 mq_timedreceive()
调用中,当前线程将通过信号通知。通知后,线程将被注销。
如果 evp
为 NULL
或 sigev_notify
成员为 SIGEV_NONE
,当前线程将被注销。
一次只能注册一个线程。
如果当前线程不是 Cobalt 线程(通过 pthread_create()
创建),此服务将失败。
参数:
mqd
: 消息队列描述符;evp
: 指向事件通知结构的指针。返回值:
0
;-1
并设置 errno
:
EINVAL
:evp
无效;EPERM
:调用者上下文无效;EBADF
:mqd
不是有效的消息队列描述符;EBUSY
:另一个线程已注册。标签:
xthread-only
,switch-primary
示例代码
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#define QUEUE_NAME "/my_queue"
#define MAX_MSG_SIZE 1024
#define QUEUE_PERMISSIONS 0660
int get_new_msg = 0;
void notify_fun(union sigval sv){
printf("Get Notify Message\n");
get_new_msg++;
}
void *receiver(void *arg)
{
mqd_t mq = *((mqd_t *)arg);
int read_msg = get_new_msg;
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = notify_fun;
sev.sigev_value.sival_ptr = &mq;
if(mq_notify(mq, &sev) == -1){
perror("mq_notify");
mq_close(mq);
mq_unlink(QUEUE_NAME);
exit(1);
}
char buffer[MAX_MSG_SIZE];
while(1){
if(get_new_msg > read_msg){
ssize_t bytes_read = mq_receive(mq, buffer, MAX_MSG_SIZE, NULL);
if (bytes_read >= 0) {
buffer[bytes_read] = '\0';
printf("Received: %s\n", buffer);
mq_notify(mq, &sev); // Re-register notification
} else {
perror("mq_receive");
}
read_msg++;
}
usleep(1000);
}
return NULL;
}
int main() {
struct mq_attr attr;
attr.mq_flags = 0; // 消息队列标志
attr.mq_maxmsg = 10; // 最大消息数
attr.mq_msgsize = 128; // 最大消息大小
attr.mq_curmsgs = 0; // 当前消息数
mqd_t mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0664, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
pthread_t tid;
pthread_create(&tid, NULL, receiver, &mq);
usleep(100*1000);
char *messages[] = {"Hello", "World", "Goodbye"};
for (int i = 0; i < 3; i++) {
mq_send(mq, messages[i], strlen(messages[i]) + 1, 0);
sleep(1);
}
pthread_cancel(tid);
mq_close(mq);
mq_unlink(QUEUE_NAME);
return 0;
}
mqd_t mq_open (const char* name, int oflags, ...)
打开消息队列。
该服务打开名为 name
的消息队列。
oflags
应设置为以下值之一:
O_RDONLY
:表示返回的队列描述符只能用于接收消息;O_WRONLY
:表示返回的队列描述符只能用于发送消息;O_RDWR
:表示返回的队列描述符可用于发送和接收消息。如果不存在名为 name
的消息队列,并且 oflags
设置了 O_CREAT
位,则该函数会创建消息队列,并接受另外两个参数:
mode
参数,类型为 mode_t
,当前被忽略;attr
参数,指向 mq_attr
结构体的指针,指定新消息队列的属性。如果 oflags
设置了 O_CREAT
和 O_EXCL
两个位,并且消息队列已经存在,则该服务会失败。
如果 oflags
设置了 O_NONBLOCK
位,则 mq_send()
、mq_receive()
、mq_timedsend()
和 mq_timedreceive()
服务会返回 -1
并将 errno
设置为 EAGAIN
,而不是阻塞调用者。
在创建消息队列时,会使用 attr
地址处的 mq_attr
结构体的以下参数:
mq_maxmsg
:队列中的最大消息数(默认128);mq_msgsize
:每条消息的最大大小(默认128)。name
可以是任意字符串,其中斜杠没有特别意义。然而,为了便于移植,建议使用以斜杠开头且不包含其他斜杠的名称。
参数:
name
:要打开的消息队列的名称;oflags
:标志。返回值:
成功时返回消息队列描述符;
失败时返回 -1
并设置 errno
:
ENAMETOOLONG
:name
参数的长度超过64个字符;EXIST
:oflags
中设置了 O_CREAT
和 O_EXCL
位,并且消息队列已经存在;ENOENT
:oflags
中未设置 O_CREAT
位,并且消息队列不存在;ENOSPC
:系统内存分配失败,或系统堆中可用内存不足以创建队列,请尝试增加 CONFIG_XENO_OPT_SYS_HEAPSZ
;EPERM
:尝试从无效上下文创建消息队列;EINVAL
:attr
参数无效;EMFILE
:当前打开的描述符过多;EAGAIN
:没有可用的注册槽,请检查/增加 CONFIG_XENO_OPT_REGISTRY_NRSLOTS
。标签:
thread-unrestricted
,switch-secondary
示例代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <time.h>
#include <errno.h>
int main() {
mqd_t mq;
struct mq_attr attr;
char *message = "Hello, World!";
struct timespec timeout;
// 设置消息队列的属性
attr.mq_flags = 0; // 消息队列标志
attr.mq_maxmsg = 10; // 最大消息数
attr.mq_msgsize = 128; // 最大消息大小
attr.mq_curmsgs = 0; // 当前消息数
// 创建或打开消息队列
mq = mq_open("/my_queue", O_CREAT | O_WRONLY, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
// 设置超时时间,例如5秒
timeout.tv_sec = 5;
timeout.tv_nsec = 0;
// 发送消息,设置超时时间
if (mq_timedsend(mq, message, strlen(message) + 1, 1, &timeout) == -1) {
if (errno == ETIMEDOUT) {
perror("Message send timed out");
} else {
perror("mq_timedsend");
}
mq_close(mq);
exit(1);
}
printf("Message sent\n");
// 关闭消息队列
mq_close(mq);
return 0;
}
ssize_t mq_receive (mqd_t q, char* buffer, size_t len, unsigned* prio)
从消息队列接收消息。
如果消息队列 fd
不为空,并且 len
大于消息队列的 mq_msgsize
,该服务会将优先级最高的排队消息复制到 buffer
地址。
如果队列为空且描述符 fd
未设置 O_NONBLOCK
标志,调用线程将被挂起,直到有消息发送到队列。如果队列为空且描述符 fd
设置了 O_NONBLOCK
标志,该服务会立即返回 -1
并将 errno
设置为 EAGAIN
。
参数:
q
: 队列描述符;buffer
: 成功时用于存储接收消息的地址;len
: 缓冲区长度;prio
: 成功时用于存储接收消息优先级的地址。返回值:
成功时返回消息长度,并将消息复制到 buffer
地址;
失败时返回 -1
,未出队消息并设置 errno
:
- EBADF
:fd
不是一个有效的用于读取的描述符;
- EMSGSIZE
:长度 len
小于消息队列的 mq_msgsize
属性;
- EAGAIN
:队列为空,且描述符 fd
设置了 O_NONBLOCK
标志;
- EPERM
:调用者上下文无效;
- EINTR
:服务被信号中断。
标签:
xthread-only
,switch-primary
示例代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <time.h>
#include <errno.h>
int main() {
mqd_t mq;
struct mq_attr attr;
char buffer[128];
unsigned int priority;
unsigned int max_cycles = 10;
// 设置消息队列的属性
attr.mq_flags = 0; // 消息队列标志
attr.mq_maxmsg = 10; // 最大消息数
attr.mq_msgsize = 128; // 最大消息大小
attr.mq_curmsgs = 0; // 当前消息数
// 创建或打开消息队列
mq = mq_open("/my_queue", O_CREAT | O_RDONLY, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
for(int i = 0; i < max_cycles; i++) {
// 接收消息
ssize_t bytes_received = mq_receive(mq, buffer, sizeof(buffer), &priority);
if (bytes_received == -1) {
perror("mq_timedreceive");
mq_close(mq);
exit(1);
}
// 打印接收到的消息
buffer[bytes_received] = '\0'; // 确保字符串以NULL结尾
printf("Received message[%d]: %s\n", priority, buffer);
}
// 关闭消息队列
mq_close(mq);
return 0;
}
int mq_send (mqd_t q, const char* buffer, size_t len, unsigned prio)
发送消息到消息队列。
如果消息队列 fd
未满,该服务会发送由参数 buffer
指向的长度为 len
的消息,并设置优先级 prio
。优先级较高的消息会插入到优先级较低的消息之前。
如果消息队列已满且未设置 O_NONBLOCK
标志,调用线程将被挂起,直到队列不满。如果消息队列已满且设置了 O_NONBLOCK
标志,消息不会被发送,服务会立即返回 -1
并将 errno
设置为 EAGAIN
。
参数:
q
: 消息队列描述符;buffer
: 指向要发送的消息的指针;len
: 消息长度;prio
: 消息的优先级。返回值:
成功时返回 0
并发送消息;
失败时返回 -1
并未发送消息,同时设置 errno
:
EBADF
:fd
不是一个有效的用于写入的消息队列描述符;EMSGSIZE
:消息长度 len
超过消息队列的 mq_msgsize
属性;EAGAIN
:描述符 fd
设置了 O_NONBLOCK
标志且消息队列已满;EPERM
:调用者上下文无效;EINTR
:服务被信号中断。标签:
xthread-only
,switch-primary
示例代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <time.h>
#include <errno.h>
int main() {
mqd_t mq;
struct mq_attr attr;
char *message = "Hello, World!";
// 设置消息队列的属性
attr.mq_flags = 0; // 消息队列标志
attr.mq_maxmsg = 10; // 最大消息数
attr.mq_msgsize = 128; // 最大消息大小
attr.mq_curmsgs = 0; // 当前消息数
// 创建或打开消息队列
mq = mq_open("/my_queue", O_CREAT | O_WRONLY, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
// 发送消息
if (mq_send(mq, message, strlen(message) + 1, 1) == -1) {
perror("mq_send");
mq_close(mq);
exit(1);
}
printf("Message sent\n");
// 关闭消息队列
mq_close(mq);
return 0;
}
int mq_setattr (mqd_t mqd, const struct mq_attr *restrict attr, struct mq_attr *restrict oattr)
设置消息队列属性。
该服务将 mqd
描述符的标志设置为 attr
指向的 mq_attr
结构体的 mq_flags
成员的值。
如果 oattr
不为 NULL
,则先前的消息队列属性值将存储在 oattr
指定的地址。
只有设置或清除 O_NONBLOCK
标志才会生效。
参数:
mqd
: 消息队列描述符;attr
: 指向新属性的指针(仅使用 mq_flags
);oattr
: 如果不为 NULL
,成功时用于存储先前消息队列属性的地址。返回值:
0
;-1
并设置 errno
:
EBADF
:mqd
不是有效的消息队列描述符。标签:
thread-unrestricted
示例代码
#include <stdio.h>
#include <stdlib.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>
int main() {
mqd_t mq;
struct mq_attr attr, oattr ,newattr;
// 设置消息队列的属性
attr.mq_flags = 0; // 消息队列标志
attr.mq_maxmsg = 10; // 最大消息数
attr.mq_msgsize = 128; // 最大消息大小
attr.mq_curmsgs = 0; // 当前消息数
// 打开消息队列,如果不存在则创建
mq = mq_open("/my_queue", O_CREAT | O_RDWR, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
// 获取当前的消息队列属性
if (mq_getattr(mq, &attr) == -1) {
perror("mq_getattr");
mq_close(mq);
exit(1);
}
// 打印当前flag
printf("Current message queue: mq_flags: %ld\n", attr.mq_flags);
// 修改消息队列属性
attr.mq_flags = attr.mq_flags | O_NONBLOCK;
// 设置消息队列属性
if (mq_setattr(mq, &attr, &oattr) == -1) {
perror("mq_setattr");
mq_close(mq);
exit(1);
}
if (mq_getattr(mq, &newattr) == -1) {
perror("mq_getattr");
mq_close(mq);
exit(1);
}
// 打印修改后的属性
printf("Updated message queue : mq_flags: %ld\n", newattr.mq_flags);
// 关闭消息队列
if (mq_close(mq) == -1) {
perror("mq_close");
exit(1);
}
// 删除消息队列
if (mq_unlink("/my_queue") == -1) {
perror("mq_unlink");
exit(1);
}
return 0;
}
int mq_unlink (const char * name)
取消链接消息队列。
该服务取消链接名为 name
的消息队列。消息队列在使用 mq_open()
服务获得的所有队列描述符通过 mq_close()
服务关闭之前不会被销毁。然而,调用该服务后,取消链接的队列将无法通过 mq_open()
服务访问。
参数:
name
: 要取消链接的消息队列的名称。返回值:
0
;-1
并设置 errno
:
EPERM
:调用者上下文无效;ENAMETOOLONG
:name
参数的长度超过 64 个字符;ENOENT
:消息队列不存在。标签:
thread-unrestricted
,switch-secondary
示例代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <time.h>
#include <errno.h>
int main() {
mqd_t mq;
struct mq_attr attr;
char *message = "Hello, World!";
// 设置消息队列的属性
attr.mq_flags = 0; // 消息队列标志
attr.mq_maxmsg = 10; // 最大消息数
attr.mq_msgsize = 128; // 最大消息大小
attr.mq_curmsgs = 0; // 当前消息数
// 创建或打开消息队列
mq = mq_open("/my_queue", O_CREAT | O_WRONLY, 0644, &attr);
if (mq == (mqd_t)-1) {
perror("mq_open");
exit(1);
}
// 发送消息
if (mq_send(mq, message, strlen(message) + 1, 1) == -1) {
perror("mq_send");
mq_close(mq);
exit(1);
}
printf("Message sent\n");
// 关闭消息队列
mq_close(mq);
if (mq_unlink("/my_queue") == -1) {
perror("mq_unlink");
exit(1);
}
return 0;
}