用于发送任意大小消息的实时 IPC 机制
消息队列是一种实时任务可以通过 rkernel 管理的消息队列交换或传递数据的方法。消息可以有不同的长度,并且可以分配不同的类型或用途。一个任务可以创建消息队列,并由多个任务使用,以发送和/或接收队列中的消息。
void * aori_queue_alloc (AORI_RT_QUEUE *queue, size_t size);
分配消息缓冲区。
此服务从队列的内部池中分配一个消息缓冲区。此缓冲区可以在调用 aori_queue_send() 之前填充有效载荷信息。当成对使用时,这些服务提供了一个零拷贝接口用于发送消息。
参数
返回值
成功时返回分配的缓冲区的地址,失败时返回 NULL。
标签
unrestricted, switch-primary
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
}
void receiver_task_func(void *arg) {
void *msg_buf;
ssize_t msg_size;
printf("Receiver: Waiting for message\n");
msg_size = aori_queue_receive(&queue, &msg_buf, AORI_RT_TM_INFINITE);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
aori_queue_free(&queue, msg_buf); // 读取完成要及时free掉内存
}
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
int aori_queue_bind (AORI_RT_QUEUE *queue, const char *name, AORI_RT_TIME timeout);
绑定到消息队列。
此例程创建一个新的描述符,以引用由其符号名称标识的现有消息队列。如果在进入时对象不存在,调用者可能会阻塞,直到创建具有给定名称的队列。
参数
返回值
成功时返回零。否则:
标签
xthread-nowait, switch-primary
注释
timeout 值被解释为 Alchemy 时钟分辨率的倍数(参见 -alchemy-clock-resolution 选项,默认为 1 纳秒)。
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
AORI_RT_QUEUE queue_send;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
// 绑定到消息队列
ret = aori_queue_bind(&queue_send, "MyQueue", AORI_RT_TM_INFINITE);
if (ret < 0) {
fprintf(stderr, "Failed to bind queue\n");
return;
}
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue_send, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue_send, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
// 解除绑定
aori_queue_unbind(&queue_send);
}
void receiver_task_func(void *arg) {
int ret;
void *msg_buf;
ssize_t msg_size;
AORI_RT_QUEUE queue_recv;
printf("Receiver: Waiting for message\n");
// 绑定到消息队列
ret = aori_queue_bind(&queue_recv, "MyQueue", AORI_RT_TM_INFINITE);
if (ret < 0) {
fprintf(stderr, "Failed to bind queue\n");
return;
}
msg_size = aori_queue_receive(&queue_recv, &msg_buf, AORI_RT_TM_INFINITE);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
aori_queue_free(&queue_recv, msg_buf); // 读取完成要及时free掉内存
}
// 解除绑定
aori_queue_unbind(&queue_recv);
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
int aori_queue_create (AORI_RT_QUEUE *queue, const char *name, size_t poolsize, size_t qlimit, AORI_RT_QUEUE_MODE mode);
创建消息队列。
创建一个消息队列对象,允许多个任务通过使用可变大小的消息来交换数据。消息队列在创建时为空。
参数
返回值
成功时返回零。否则:
标签
xthread-only, mode-unrestricted, switch-secondary
注释
队列可以由属于同一 rkernel 会话的多个进程共享。
每个挂起在队列中的消息消耗四个长字以及实际有效载荷大小,向下对齐到下一个长字边界。例如,在 32 位平台上,一个 6 字节的消息需要 24 字节的存储空间。
当 qlimit 被指定(即不同于 Q_UNLIMITED)时,此开销会自动计算,因此 poolsize / qlimit 字节的 qlimit 消息可以同时存储到池中。否则,poolsize 会在内部增加 5% 以应对这种开销。
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
}
void receiver_task_func(void *arg) {
void *msg_buf;
ssize_t msg_size;
printf("Receiver: Waiting for message\n");
msg_size = aori_queue_receive(&queue, &msg_buf, AORI_RT_TM_INFINITE);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
aori_queue_free(&queue, msg_buf); // 读取完成要及时free掉内存
}
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
int aori_queue_delete (AORI_RT_QUEUE *queue);
删除消息队列。
此例程删除先前通过调用 aori_queue_create() 创建的队列对象。所有附加到该队列的资源将自动释放,包括所有挂起的消息。
参数
返回值
成功时返回零。否则:
标签
mode-unrestricted, switch-secondary
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
}
void receiver_task_func(void *arg) {
void *msg_buf;
ssize_t msg_size;
printf("Receiver: Waiting for message\n");
msg_size = aori_queue_receive(&queue, &msg_buf, AORI_RT_TM_INFINITE);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
aori_queue_free(&queue, msg_buf); // 读取完成要及时free掉内存
}
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
int aori_queue_flush (AORI_RT_QUEUE *queue);
刷新队列中的挂起消息。
此例程刷新当前队列中所有挂起的消息,适当地释放所有消息缓冲区。
参数
返回值
成功时返回零。否则:
标签
unrestricted, switch-primary
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <aori/nrt/clock.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
// 刷新队列并释放缓存消息,receiver会触发超时故障
aori_queue_flush(&queue);
printf("Sender: Flush message\n");
}
void receiver_task_func(void *arg) {
void *msg_buf;
ssize_t msg_size;
struct timespec abs_timeout;
aori_task_sleep(500000000); // 睡眠500ms,等待缓存消息被刷新掉
printf("Receiver: Waiting for message\n");
// 设置绝对超时时间为当前时间加上2秒
aori_nrt_clock_gettime(CLOCK_MONOTONIC, &abs_timeout);
abs_timeout.tv_sec += 2;
msg_size = aori_queue_receive_timed(&queue, &msg_buf, &abs_timeout);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
aori_queue_free(&queue, msg_buf); // 读取完成要及时free掉内存
}
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
int aori_queue_free (AORI_RT_QUEUE *queue, void *buf);
释放消息缓冲区。
此服务将消息缓冲区释放到队列的内部池中。
参数
返回值
成功时返回零;如果 buf 不是先前由 aori_queue_alloc() 服务分配的有效消息缓冲区,或者调用者未通过 aori_queue_receive() 成功返回获得消息的所有权,则返回 -EINVAL。
标签
unrestricted, switch-primary
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
}
void receiver_task_func(void *arg) {
void *msg_buf;
ssize_t msg_size;
printf("Receiver: Waiting for message\n");
msg_size = aori_queue_receive(&queue, &msg_buf, AORI_RT_TM_INFINITE);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
aori_queue_free(&queue, msg_buf); // 读取完成要及时free掉内存
}
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
int aori_queue_inquire (AORI_RT_QUEUE *queue, RT_QUEUE_INFO *info);
查询队列状态。
此例程返回指定队列的状态信息。
参数
返回值
成功时返回零,并将状态信息写入 info 指向的结构体。否则:
标签
unrestricted, switch-primary
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
AORI_RT_QUEUE_INFO info;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
ret = aori_queue_inquire(&queue, &info);
if (ret < 0) {
fprintf(stderr, "Failed to inquire infomation from queue\n");
return;
}
printf("Get Current Queue Infomation:\n");
printf(" name:%s, usedmem:%d, poolsize:%d, qlimit:%d\n", info.name, info.usedmem, info.poolsize, info.qlimit);
printf(" mode:%d, nmessages:%d, nwaiters:%d\n", info.mode, info.nmessages, info.nwaiters);
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
}
void receiver_task_func(void *arg) {
void *msg_buf;
ssize_t msg_size;
printf("Receiver: Waiting for message\n");
msg_size = aori_queue_receive(&queue, &msg_buf, AORI_RT_TM_INFINITE);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
aori_queue_free(&queue, msg_buf); // 读取完成要及时free掉内存
}
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
ssize_t aori_queue_read (AORI_RT_QUEUE *q, void *buf, size_t size, AORI_RT_TIME timeout);
从队列中读取(带相对标量超时)。
此例程是 aori_queue_read_timed() 的变体,接受以标量值表示的相对超时规范。
参数
标签
xthread-nowait, switch-primary
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
}
void receiver_task_func(void *arg) {
char msg_buf[MAX_MSG_SIZE];
ssize_t msg_size;
printf("Receiver: Waiting for message\n");
msg_size = aori_queue_read(&queue, msg_buf, sizeof(msg_buf), AORI_RT_TM_INFINITE);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
}
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
ssize_t aori_queue_read_timed (AORI_RT_QUEUE *q, void *buf, size_t size, const struct timespec *abs_timeout);
从队列中读取消息。
此服务从指定队列中读取下一个可用消息。
参数
返回值
成功时返回复制到 buf 的字节数。零是一个可能的值,对应于传递给 aori_queue_send() 或 aori_queue_write() 的零大小消息。否则:
标签
xthread-nowait, switch-primary
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <aori/nrt/clock.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
}
void receiver_task_func(void *arg) {
char msg_buf[MAX_MSG_SIZE];
ssize_t msg_size;
struct timespec abs_timeout;
printf("Receiver: Waiting for message\n");
// 设置绝对超时时间为当前时间加上2秒
aori_nrt_clock_gettime(CLOCK_MONOTONIC, &abs_timeout);
abs_timeout.tv_sec += 2;
msg_size = aori_queue_read_timed(&queue, msg_buf, sizeof(msg_buf), &abs_timeout);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
}
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
ssize_t aori_queue_read_until (AORI_RT_QUEUE *q, void *buf, size_t size, AORI_RT_TIME abs_timeout);
从队列中读取(带绝对标量超时)。
此例程是 aori_queue_read_timed() 的变体,接受以标量值表示的绝对超时规范。
参数
标签
xthread-nowait, switch-primary
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <aori/nrt/clock.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
}
void receiver_task_func(void *arg) {
char msg_buf[MAX_MSG_SIZE];
ssize_t msg_size;
printf("Receiver: Waiting for message\n");
AORI_RT_TIME timeout = aori_timer_read() + 2000000000; // 2 seconds
msg_size = aori_queue_read_until(&queue, msg_buf, sizeof(msg_buf), timeout);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
}
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
ssize_t aori_queue_receive (AORI_RT_QUEUE *q, void **bufp, AORI_RT_TIME timeout);
从队列中接收消息(带相对标量超时)。
此例程是 aori_queue_receive_timed() 的变体,接受以标量值表示的相对超时规范。
参数
标签
xthread-nowait, switch-primary
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
}
void receiver_task_func(void *arg) {
void *msg_buf;
ssize_t msg_size;
printf("Receiver: Waiting for message\n");
msg_size = aori_queue_receive(&queue, &msg_buf, AORI_RT_TM_INFINITE);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
aori_queue_free(&queue, msg_buf); // 读取完成要及时free掉内存
}
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
ssize_t aori_queue_receive_timed (AORI_RT_QUEUE *q, void **bufp, const struct timespec *abs_timeout);
从队列中接收消息(带绝对超时日期)。
此服务从指定队列中接收下一个可用消息。
参数
返回值
成功时返回接收到的消息的字节数。零是一个可能的值,对应于传递给 aori_queue_send() 或 aori_queue_write() 的零大小消息。否则:
标签
xthread-nowait, switch-primary
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <aori/nrt/clock.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
}
void receiver_task_func(void *arg) {
void *msg_buf;
ssize_t msg_size;
struct timespec abs_timeout;
printf("Receiver: Waiting for message\n");
// 设置绝对超时时间为当前时间加上2秒
aori_nrt_clock_gettime(CLOCK_MONOTONIC, &abs_timeout);
abs_timeout.tv_sec += 2;
msg_size = aori_queue_receive_timed(&queue, &msg_buf, &abs_timeout);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
aori_queue_free(&queue, msg_buf); // 读取完成要及时free掉内存
}
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
ssize_t aori_queue_receive_until (AORI_RT_QUEUE *q, void **bufp, AORI_RT_TIME abs_timeout);
从队列中接收消息(带绝对标量超时)。
此例程是 aori_queue_receive_timed() 的变体,接受以标量值表示的绝对超时规范。
参数
标签
xthread-nowait, switch-primary
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
}
void receiver_task_func(void *arg) {
void *msg_buf;
ssize_t msg_size;
printf("Receiver: Waiting for message\n");
AORI_RT_TIME timeout = aori_timer_read() + 2000000000; // 2 seconds
msg_size = aori_queue_receive_until(&queue, &msg_buf, timeout);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
aori_queue_free(&queue, msg_buf); // 读取完成要及时free掉内存
}
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
int aori_queue_send (AORI_RT_QUEUE *q, const void *buf, size_t size, AORI_RT_QUEUE_OPER_MODE mode);
发送消息到队列。
此服务将完整消息发送到指定队列。消息必须由之前调用 aori_queue_alloc() 分配。
参数
返回值
成功时,此服务返回因操作而被唤醒的接收者数量。如果返回零,则表示队列接收端没有任务在等待,消息已入队。错误时,返回以下错误代码之一:
标签
unrestricted, switch-primary
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
}
void receiver_task_func(void *arg) {
void *msg_buf;
ssize_t msg_size;
printf("Receiver: Waiting for message\n");
msg_size = aori_queue_receive(&queue, &msg_buf, AORI_RT_TM_INFINITE);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
aori_queue_free(&queue, msg_buf); // 读取完成要及时free掉内存
}
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}
int aori_queue_unbind (AORI_RT_QUEUE *q);
解除消息队列的绑定。
参数
此例程释放之前对消息队列的绑定。此调用返回后,描述符将不再有效,无法再引用该对象。
标签
thread-unrestricted
{{% details title="示例代码" closed="true" %}}
#include <aori/rt/queue.h>
#include <aori/rt/task.h>
#include <aori/rt/timer.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define TASK_PRIO 50
#define TASK_MODE AORI_RT_TASK_MODE_JOINABLE
#define TASK_STKSZ 0
#define QUEUE_SIZE 1024
#define MAX_MSG_SIZE 100
AORI_RT_QUEUE queue;
AORI_RT_TASK sender_task, receiver_task;
void sender_task_func(void *arg) {
int ret;
const char *message = "Hello from rkernel sender task!";
void *msg_buf;
AORI_RT_QUEUE queue_send;
aori_task_sleep(500000000); // 睡眠500ms,确保接收者任务已经开始等待
// 绑定到消息队列
ret = aori_queue_bind(&queue_send, "MyQueue", AORI_RT_TM_INFINITE);
if (ret < 0) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 分配消息缓冲区
msg_buf = aori_queue_alloc(&queue_send, strlen(message) + 1);
if (msg_buf == NULL) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
// 复制消息到缓冲区
strcpy(msg_buf, message); // NOLINT
printf("Sender: Sending message\n");
ret = aori_queue_send(&queue_send, msg_buf, strlen(message) + 1, AORI_RT_Q_NORMAL);
if (ret < 0) {
fprintf(stderr, "Sender: Failed to send message, error code: %d\n", ret);
} else {
printf("Sender: Message sent successfully\n");
}
// 解除绑定
aori_queue_unbind(&queue_send);
}
void receiver_task_func(void *arg) {
int ret;
void *msg_buf;
ssize_t msg_size;
AORI_RT_QUEUE queue_recv;
printf("Receiver: Waiting for message\n");
// 绑定到消息队列
ret = aori_queue_bind(&queue_recv, "MyQueue", AORI_RT_TM_INFINITE);
if (ret < 0) {
fprintf(stderr, "Failed to allocate message buffer\n");
return;
}
msg_size = aori_queue_receive(&queue_recv, &msg_buf, AORI_RT_TM_INFINITE);
if (msg_size < 0) {
if (msg_size == -ETIMEDOUT) {
printf("Receiver: Timeout while waiting for message\n");
} else {
fprintf(stderr, "Receiver: Failed to receive message, error code: %zd\n", msg_size);
}
} else {
printf("Receiver: Received message of size %zd: %s\n", msg_size, (char *)msg_buf); // NOLINT
aori_queue_free(&queue_recv, msg_buf); // 读取完成要及时free掉内存
}
// 解除绑定
aori_queue_unbind(&queue_recv);
}
int main(int argc, char *argv[]) {
int ret;
// 创建消息队列
ret = aori_queue_create(&queue, "MyQueue", QUEUE_SIZE, MAX_MSG_SIZE, AORI_RT_QUEUE_MODE_FIFO);
if (ret) {
fprintf(stderr, "Failed to create queue, error code: %d\n", ret);
return EXIT_FAILURE;
}
// 创建发送者任务
ret = aori_task_create(&sender_task, "SenderTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create sender task, error code: %d\n", ret);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 创建接收者任务
ret = aori_task_create(&receiver_task, "ReceiverTask", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret) {
fprintf(stderr, "Failed to create receiver task, error code: %d\n", ret);
aori_task_delete(&sender_task);
aori_queue_delete(&queue);
return EXIT_FAILURE;
}
// 启动任务
aori_task_start(&receiver_task, &receiver_task_func, NULL);
aori_task_start(&sender_task, &sender_task_func, NULL);
// 等待任务完成
aori_task_join(&sender_task);
aori_task_join(&receiver_task);
// 清理资源
aori_task_delete(&sender_task);
aori_task_delete(&receiver_task);
aori_queue_delete(&queue);
printf("Program completed\n");
return EXIT_SUCCESS;
}
{{% /details %}}