菜单

Cobalt/Buffer services

Macros

c 复制代码
#define 	B_PRIO   0x1 /* Pend by task priority order. */

详细描述

缓冲区是一种轻量级的IPC机制,实现了快速的单向生产者-消费者数据路径。所有写入的消息都严格按照FIFO顺序在单个内存区域中缓冲,直到以阻塞或非阻塞模式读取。

在写入端,消息总是以原子方式处理(即,没有交错,没有短写入),而在读取端,通常只返回完整的消息。然而,在定义明确的情况下可能会发生短读取(参见 rt_buffer_read() 中的注意事项),尽管通过正确使用缓冲区可以完全避免这种情况。


函数文档

rt_buffer_bind

int rt_buffer_bind (RT_BUFFER *bf, const char *name, RTIME timeout);

绑定到一个IPC缓冲区。

此例程创建一个新的描述符,用于引用由其符号名标识的现有IPC缓冲区。如果在入口处对象不存在,调用者可能会阻塞,直到创建了给定名称的缓冲区。

参数

  • bf 由操作填充的缓冲区描述符的地址。在失败时,此内存的内容是未定义的。
  • name 一个有效的NULL结尾的名称,用于标识要绑定的缓冲区。此字符串应与传递给 rt_buffer_create() 的对象名称参数匹配。
  • timeout 等待注册发生的时钟滴答数(参见注意)。传递 TM_INFINITE 会导致调用者无限期阻塞,直到对象被注册。传递 TM_NONBLOCK 会导致服务在对象在入口处未注册时立即返回,而不等待。

返回值

成功时返回零。否则:

  • 如果在检索完成之前为当前任务调用了 rt_task_unblock(),则返回 -EINTR。
  • 如果 timeout 等于 TM_NONBLOCK 并且在入口处搜索的对象未注册,则返回 -EWOULDBLOCK。
  • 如果无法在指定的时间内检索到对象,则返回 -ETIMEDOUT。
  • 如果此服务应阻塞,但未从xkernel线程调用,则返回 -EPERM。

标签

xthread-nowait, switch-primary

注意

超时值被解释为Alchemy时钟分辨率的倍数(参见 alchemy-clock-resolution 选项,默认为1纳秒)。

示例代码

c{filename="app.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <alchemy/task.h>
#include <alchemy/buffer.h>

#define MESSAGE "Hello, Sinsegye!"
#define BUFFER_SIZE sizeof(MESSAGE)

static RT_BUFFER buffer_desc;

void writer_task(void *arg)
{
    int ret;
    
    printf("Writer task: Writing message to buffer\n");
    ret = rt_buffer_write(&buffer_desc, MESSAGE, sizeof(MESSAGE), TM_INFINITE);
    if (ret < 0) {
        perror("rt_buffer_write");
    } else {
        printf("Writer task: Wrote %d bytes to buffer\n", ret);
    }
}

void reader_task(void *arg)
{
    int ret;
    char msg[BUFFER_SIZE];
    
    RT_BUFFER buffer_read;

    ret = rt_buffer_bind(&buffer_read,"my_buffer",TM_INFINITE);
    if (ret) {
        fprintf(stderr, "Failed to bind buffer: %s\n", strerror(-ret));
        return ;
    }

    printf("Reader task: Reading message from buffer\n");
    ret = rt_buffer_read(&buffer_read, msg, sizeof(msg), TM_INFINITE);
    if (ret < 0) {
        printf("rt_buffer_read Error: %d\n", ret);
        perror("rt_buffer_read");
    } else {
        printf("Reader task: Read %d bytes from buffer: %s\n", ret, msg);
    }

    ret = rt_buffer_unbind(&buffer_read);
    if (ret) {
        fprintf(stderr, "Failed to unbind buffer: %s\n", strerror(-ret));
        return ;
    }
}

int main(int argc, char *argv[])
{
    int ret;
    RT_TASK writer, reader;

    ret = rt_buffer_create(&buffer_desc, "my_buffer", BUFFER_SIZE, B_PRIO);
    if (ret) {
        fprintf(stderr, "Failed to create buffer: %s\n", strerror(-ret));
        return EXIT_FAILURE;
    }

    printf("Buffer created successfully\n");

    ret = rt_task_create(&writer, "writer", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create writer task: %s\n", strerror(-ret));
        goto out_buffer_delete;
    }

    ret = rt_task_create(&reader, "reader", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create reader task: %s\n", strerror(-ret));
        goto out_task_delete;
    }

    rt_task_start(&writer, &writer_task, NULL);
    rt_task_start(&reader, &reader_task, NULL);

    // Wait for tasks to complete
    rt_task_join(&writer);
    rt_task_join(&reader);
    return ret ? EXIT_FAILURE : EXIT_SUCCESS;

out_task_delete:
    rt_task_delete(&writer);
out_buffer_delete:
    rt_buffer_delete(&buffer_desc);

    return ret ? EXIT_FAILURE : EXIT_SUCCESS;
}

rt_buffer_create

int rt_buffer_create (RT_BUFFER *bf, const char *name, size_t bufsz, int mode);

创建一个IPC缓冲区。

此例程创建一个IPC对象,允许任务通过内存缓冲区异步发送和接收数据。数据可以是任意长度,尽管这种IPC最适合小到中等大小的消息,因为数据在传输过程中总是需要复制到缓冲区。大型消息可能通过消息队列(RT_QUEUE)更有效地处理。

参数

  • bf 一个缓冲区描述符的地址,如果此调用成功,可以稍后用来唯一标识创建的对象。
  • name 一个ASCII字符串,代表缓冲区的符号名称。当非NULL且非空时,此字符串的副本用于将创建的缓冲区索引到对象注册表中。
  • bufsz 可用于存储数据的缓冲区空间的大小。所需的内存从主堆中获取。
  • mode 缓冲区创建模式。以下标志可以被OR到此位掩码中,每个标志都会影响新的缓冲区:

B_FIFO 使任务以FIFO顺序挂起,以从缓冲区读取数据。
B_PRIO 使任务以优先级顺序挂起,以从缓冲区读取数据。

此参数也适用于阻塞在缓冲区写侧的任务(参见 rt_buffer_write())。

返回值

成功时返回零。否则:

  • 如果 mode 无效或 bufsz 为零,则返回 -EINVAL。
  • 如果系统无法从主堆中获取内存以创建缓冲区,则返回 -ENOMEM。
  • 如果名称与已注册的缓冲区冲突,则返回 -EEXIST。
  • 如果此服务是从无效上下文中调用的,例如中断或非xkernel线程,则返回 -EPERM。

标签

xthread-only, mode-unrestricted, switch-secondary

注意

属于同一xkernel会话的多个进程可以共享缓冲区。

示例代码

c{filename="app.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <alchemy/task.h>
#include <alchemy/buffer.h>

#define MESSAGE "Hello, Sinsegye!"
#define BUFFER_SIZE sizeof(MESSAGE)

static RT_BUFFER buffer_desc;

void writer_task(void *arg)
{
    int ret;
    
    printf("Writer task: Writing message to buffer\n");
    ret = rt_buffer_write(&buffer_desc, MESSAGE, sizeof(MESSAGE), TM_INFINITE);
    if (ret < 0) {
        perror("rt_buffer_write");
    } else {
        printf("Writer task: Wrote %d bytes to buffer\n", ret);
    }
}

void reader_task(void *arg)
{
    int ret;
    char msg[BUFFER_SIZE];
    
    printf("Reader task: Reading message from buffer\n");
    ret = rt_buffer_read(&buffer_desc, msg, sizeof(msg), TM_INFINITE);
    if (ret < 0) {
        printf("rt_buffer_read Error: %d\n", ret);
        perror("rt_buffer_read");
    } else {
        printf("Reader task: Read %d bytes from buffer: %s\n", ret, msg);
    }
}

int main(int argc, char *argv[])
{
    int ret;
    RT_TASK writer, reader;

    ret = rt_buffer_create(&buffer_desc, "my_buffer", BUFFER_SIZE, B_PRIO);
    if (ret) {
        fprintf(stderr, "Failed to create buffer: %s\n", strerror(-ret));
        return EXIT_FAILURE;
    }

    printf("Buffer created successfully\n");

    ret = rt_task_create(&writer, "writer", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create writer task: %s\n", strerror(-ret));
        goto out_buffer_delete;
    }

    ret = rt_task_create(&reader, "reader", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create reader task: %s\n", strerror(-ret));
        goto out_task_delete;
    }

    rt_task_start(&writer, &writer_task, NULL);
    rt_task_start(&reader, &reader_task, NULL);

    // Wait for tasks to complete
    rt_task_join(&writer);
    rt_task_join(&reader);
    return ret ? EXIT_FAILURE : EXIT_SUCCESS;

out_task_delete:
    rt_task_delete(&writer);
out_buffer_delete:
    rt_buffer_delete(&buffer_desc);

    return ret ? EXIT_FAILURE : EXIT_SUCCESS;
}

rt_buffer_delete

int rt_buffer_delete (RT_BUFFER *bf);

删除一个IPC缓冲区。

此例程删除由 rt_buffer_create() 创建的缓冲区对象。

参数

  • bf 缓冲区描述符。

返回值

成功时返回零。否则:

  • 如果 bf 不是有效的缓冲区描述符,则返回 -EINVAL。
  • 如果此服务是从异步上下文中调用的,则返回 -EPERM。

标签

mode-unrestricted, switch-secondary

示例代码

c{filename="app.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <alchemy/task.h>
#include <alchemy/buffer.h>

#define MESSAGE "Hello, Sinsegye!"
#define BUFFER_SIZE sizeof(MESSAGE)

static RT_BUFFER buffer_desc;

void writer_task(void *arg)
{
    int ret;
    
    printf("Writer task: Writing message to buffer\n");
    ret = rt_buffer_write(&buffer_desc, MESSAGE, sizeof(MESSAGE), TM_INFINITE);
    if (ret < 0) {
        perror("rt_buffer_write");
    } else {
        printf("Writer task: Wrote %d bytes to buffer\n", ret);
    }
}

void reader_task(void *arg)
{
    int ret;
    char msg[BUFFER_SIZE];
    
    printf("Reader task: Reading message from buffer\n");
    ret = rt_buffer_read(&buffer_desc, msg, sizeof(msg), TM_INFINITE);
    if (ret < 0) {
        printf("rt_buffer_read Error: %d\n", ret);
        perror("rt_buffer_read");
    } else {
        printf("Reader task: Read %d bytes from buffer: %s\n", ret, msg);
    }
}

int main(int argc, char *argv[])
{
    int ret;
    RT_TASK writer, reader;

    ret = rt_buffer_create(&buffer_desc, "my_buffer", BUFFER_SIZE, B_PRIO);
    if (ret) {
        fprintf(stderr, "Failed to create buffer: %s\n", strerror(-ret));
        return EXIT_FAILURE;
    }

    printf("Buffer created successfully\n");

    ret = rt_task_create(&writer, "writer", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create writer task: %s\n", strerror(-ret));
        goto out_buffer_delete;
    }

    ret = rt_task_create(&reader, "reader", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create reader task: %s\n", strerror(-ret));
        goto out_task_delete;
    }

    rt_task_start(&writer, &writer_task, NULL);
    rt_task_start(&reader, &reader_task, NULL);

    // Wait for tasks to complete
    rt_task_join(&writer);
    rt_task_join(&reader);
    return ret ? EXIT_FAILURE : EXIT_SUCCESS;

out_task_delete:
    rt_task_delete(&writer);
out_buffer_delete:
    rt_buffer_delete(&buffer_desc);

    return ret ? EXIT_FAILURE : EXIT_SUCCESS;
}

rt_buffer_inquire

int rt_buffer_inquire (RT_BUFFER *bf, RT_BUFFER_INFO *info);

查询缓冲区状态。

此例程返回指定缓冲区的状态信息。

参数

  • bf 缓冲区描述符。
  • info 一个指向返回缓冲区的指针,用于复制信息。

返回值

成功时返回零,并将状态信息写入由 info 指向的结构。否则:

  • 如果 bf 不是有效的缓冲区描述符,则返回 -EINVAL。

标签

unrestricted, switch-primary

示例代码

c{filename="app.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <alchemy/task.h>
#include <alchemy/buffer.h>

#define MESSAGE "Hello, Sinsegye!"
#define BUFFER_SIZE sizeof(MESSAGE)

static RT_BUFFER buffer_desc;

void writer_task(void *arg)
{
    int ret;
    
    printf("Writer task: Writing message to buffer\n");
    ret = rt_buffer_write(&buffer_desc, MESSAGE, sizeof(MESSAGE), TM_INFINITE);
    if (ret < 0) {
        perror("rt_buffer_write");
    } else {
        printf("Writer task: Wrote %d bytes to buffer\n", ret);
    }
}

void reader_task(void *arg)
{
    int ret;
    char msg[BUFFER_SIZE];
    struct RT_BUFFER_INFO info;

    rt_buffer_inquire(&buffer_desc,&info);
    printf("buffer_name %s,  totalmem %ld bytes, availablemem %ld bytes\n",info.name,
    info.totalmem, info.availmem);
    
    printf("Reader task: Reading message from buffer\n");
    ret = rt_buffer_read(&buffer_desc, msg, sizeof(msg), TM_INFINITE);
    if (ret < 0) {
        printf("rt_buffer_read Error: %d\n", ret);
        perror("rt_buffer_read");
    } else {
        printf("Reader task: Read %d bytes from buffer: %s\n", ret, msg);
    }
}

int main(int argc, char *argv[])
{
    int ret;
    RT_TASK writer, reader;

    ret = rt_buffer_create(&buffer_desc, "my_buffer", BUFFER_SIZE + 10, B_PRIO);
    if (ret) {
        fprintf(stderr, "Failed to create buffer: %s\n", strerror(-ret));
        return EXIT_FAILURE;
    }

    printf("Buffer created successfully\n");

    ret = rt_task_create(&writer, "writer", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create writer task: %s\n", strerror(-ret));
        goto out_buffer_delete;
    }

    ret = rt_task_create(&reader, "reader", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create reader task: %s\n", strerror(-ret));
        goto out_task_delete;
    }

    rt_task_start(&writer, &writer_task, NULL);
    rt_task_start(&reader, &reader_task, NULL);

    // Wait for tasks to complete
    rt_task_join(&writer);
    rt_task_join(&reader);
    return ret ? EXIT_FAILURE : EXIT_SUCCESS;

out_task_delete:
    rt_task_delete(&writer);
out_buffer_delete:
    rt_buffer_delete(&buffer_desc);

    return ret ? EXIT_FAILURE : EXIT_SUCCESS;
}

rt_buffer_read

ssize_t rt_buffer_read (RT_BUFFER *bf, void *ptr, size_t size, RTIME timeout)

Read from an IPC buffer (with relative scalar timeout).

This routine is a variant of rt_buffer_read_timed() accepting a relative timeout specification expressed as a scalar value.

参数

  • bf The buffer descriptor.
  • ptr A pointer to a memory area which will be written upon success with the received data.
  • len The length in bytes of the memory area pointed to by ptr.
  • timeout A delay expressed in clock ticks. Passing TM_INFINITE causes the caller to block indefinitely until enough data is available. Passing TM_NONBLOCK causes the service to return immediately without blocking in case not enough data is available.

标签

xthread-nowait, switch-primary

示例代码

c{filename="app.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <alchemy/task.h>
#include <alchemy/buffer.h>

#define MESSAGE "Hello, Sinsegye!"
#define BUFFER_SIZE sizeof(MESSAGE)

static RT_BUFFER buffer_desc;

void writer_task(void *arg)
{
    int ret;
    
    printf("Writer task: Writing message to buffer\n");
    ret = rt_buffer_write(&buffer_desc, MESSAGE, sizeof(MESSAGE), TM_INFINITE);
    if (ret < 0) {
        perror("rt_buffer_write");
    } else {
        printf("Writer task: Wrote %d bytes to buffer\n", ret);
    }
}

void reader_task(void *arg)
{
    int ret;
    char msg[BUFFER_SIZE];
    
    printf("Reader task: Reading message from buffer\n");
    ret = rt_buffer_read(&buffer_desc, msg, sizeof(msg), TM_INFINITE);
    if (ret < 0) {
        printf("rt_buffer_read Error: %d\n", ret);
        perror("rt_buffer_read");
    } else {
        printf("Reader task: Read %d bytes from buffer: %s\n", ret, msg);
    }
}

int main(int argc, char *argv[])
{
    int ret;
    RT_TASK writer, reader;

    ret = rt_buffer_create(&buffer_desc, "my_buffer", BUFFER_SIZE, B_PRIO);
    if (ret) {
        fprintf(stderr, "Failed to create buffer: %s\n", strerror(-ret));
        return EXIT_FAILURE;
    }

    printf("Buffer created successfully\n");

    ret = rt_task_create(&writer, "writer", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create writer task: %s\n", strerror(-ret));
        goto out_buffer_delete;
    }

    ret = rt_task_create(&reader, "reader", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create reader task: %s\n", strerror(-ret));
        goto out_task_delete;
    }

    rt_task_start(&writer, &writer_task, NULL);
    rt_task_start(&reader, &reader_task, NULL);

    // Wait for tasks to complete
    rt_task_join(&writer);
    rt_task_join(&reader);
    return ret ? EXIT_FAILURE : EXIT_SUCCESS;

out_task_delete:
    rt_task_delete(&writer);
out_buffer_delete:
    rt_buffer_delete(&buffer_desc);

    return ret ? EXIT_FAILURE : EXIT_SUCCESS;
}

rt_buffer_read_timed

ssize_t rt_buffer_read_timed (RT_BUFFER *bf, void *ptr, size_t size, const struct timespec *abs_timeout)

从IPC缓冲区读取。

此例程从指定的缓冲区读取下一条消息。如果在入口处没有消息可用,允许调用者阻塞,直到有足够的数据写入缓冲区,或者超时。

参数

  • bf 缓冲区描述符。
  • ptr 指向一个内存区域的指针,成功接收数据后将在此处写入。
  • len ptr指向的内存区域的字节长度。在正常情况下,rt_buffer_read_timed() 只返回由 len 参数指定的整个消息,或者一个错误值。然而,当检测到潜在的死锁情况时,允许进行短读取(见下面的注意事项)。
  • abs_timeout 基于 Alchemy 时钟,以秒/纳秒表示的绝对日期,指定等待从缓冲区获取消息的时间限制。传递 NULL 会导致调用者无限期阻塞,直到有足够的数据可用。传递 { .tv_sec = 0, .tv_nsec = 0 } 会导致服务在没有足够的数据可用时立即返回,而不会阻塞。

返回值

成功时返回从缓冲区读取的字节数。否则:

  • 如果在完整消息到达之前达到 abs_timeout,则返回 -ETIMEDOUT。
  • 如果 abs_timeout 是 { .tv_sec = 0, .tv_nsec = 0 } 并且在入口处没有足够的数据立即形成完整的消息,则返回 -EWOULDBLOCK。
  • 如果在有足够的数据形成完整的消息之前为当前任务调用了 rt_task_unblock(),则返回 -EINTR。
  • 如果 bf 不是有效的缓冲区描述符,或 len 大于实际的缓冲区长度,则返回 -EINVAL。
  • 如果在调用者等待数据时删除了 bf,则返回 -EIDRM。在这种情况下,此服务返回时 bf 不再有效。
  • 如果此服务应阻塞,但未从 xkernel 线程调用,则返回 -EPERM。

注意

当遇到缓冲区的病态使用时,可能会发生短读取(即,返回的字节数少于 len 请求的字节数)。只有当系统检测到一个或多个写入者正在等待发送数据,而读取者同时必须等待接收完整的消息时,才会出现这种情况。例如,考虑以下涉及1024字节缓冲区(bf)和两个线程的序列:

写入线程 > rt_write_buffer(&bf, ptr, 1, TM_INFINITE); (一个字节可读,1023字节可用于发送)
写入线程 > rt_write_buffer(&bf, ptr, 1024, TM_INFINITE); (写入者阻塞 - 没有空间用于另一个1024字节的消息)
读取线程 > rt_read_buffer(&bf, ptr, 1024, TM_INFINITE); (短读取 - 返回一个被截断的(1字节)消息)

为了防止两个线程无限期地等待彼此,允许进行短读取,可以通过后续调用 rt_buffer_read() 或 rt_buffer_read_until() 来完成。如果出现这种情况,应该修复线程优先级、缓冲区和/或消息长度,以消除这种情况。

标签

xthread-nowait, switch-primary

示例代码

c{filename="app.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <alchemy/task.h>
#include <alchemy/buffer.h>

#define MESSAGE "Hello, Sinsegye!"
#define BUFFER_SIZE sizeof(MESSAGE)

static RT_BUFFER buffer_desc;

void writer_task(void *arg)
{
    int ret;
    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);
    ts.tv_sec += 2;
    
    printf("Writer task: Writing message to buffer\n");
    ret = rt_buffer_write_timed(&buffer_desc, MESSAGE, sizeof(MESSAGE), &ts);
    if (ret < 0) {
        perror("rt_buffer_write");
    } else {
        printf("Writer task: Wrote %d bytes to buffer\n", ret);
    }
}

void reader_task(void *arg)
{
    int ret;
    char msg[BUFFER_SIZE];
    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);
    ts.tv_sec += 2;
    
    printf("Reader task: Reading message from buffer\n");
    ret = rt_buffer_read_timed(&buffer_desc, msg, sizeof(msg), &ts);
    if (ret < 0) {
        printf("rt_buffer_read Error: %d\n", ret);
        perror("rt_buffer_read");
    } else {
        printf("Reader task: Read %d bytes from buffer: %s\n", ret, msg);
    }
}

int main(int argc, char *argv[])
{
    int ret;
    RT_TASK writer, reader;

    ret = rt_buffer_create(&buffer_desc, "my_buffer", BUFFER_SIZE, B_PRIO);
    if (ret) {
        fprintf(stderr, "Failed to create buffer: %s\n", strerror(-ret));
        return EXIT_FAILURE;
    }

    printf("Buffer created successfully\n");

    ret = rt_task_create(&writer, "writer", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create writer task: %s\n", strerror(-ret));
        goto out_buffer_delete;
    }

    ret = rt_task_create(&reader, "reader", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create reader task: %s\n", strerror(-ret));
        goto out_task_delete;
    }

    rt_task_start(&writer, &writer_task, NULL);
    rt_task_start(&reader, &reader_task, NULL);

    // Wait for tasks to complete
    rt_task_join(&writer);
    rt_task_join(&reader);
    return ret ? EXIT_FAILURE : EXIT_SUCCESS;

out_task_delete:
    rt_task_delete(&writer);
out_buffer_delete:
    rt_buffer_delete(&buffer_desc);

    return ret ? EXIT_FAILURE : EXIT_SUCCESS;
}

rt_buffer_read_until

ssize_t rt_buffer_read_until (RT_BUFFER *bf, void *ptr, size_t size, RTIME timeout);

从IPC缓冲区读取(具有绝对标量超时)。

此例程是 rt_buffer_read_timed() 的一个变体,接受以标量值表示的绝对超时规范。

参数

  • bf 缓冲区描述符。
  • ptr 指向一个内存区域的指针,成功接收数据后将在此处写入。
  • len ptr指向的内存区域的字节长度。
  • abs_timeout 以时钟滴答表示的绝对日期。传递 TM_INFINITE 会导致调用者无限期阻塞,直到有足够的数据可用。传递 TM_NONBLOCK 会导致服务在没有足够的数据可用时立即返回,而不会阻塞。

标签

xthread-nowait, switch-primary

示例代码

c{filename="app.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <alchemy/task.h>
#include <alchemy/buffer.h>
#include <alchemy/timer.h>

#define MESSAGE "Hello, Sinsegye!"
#define BUFFER_SIZE sizeof(MESSAGE)

#define RT_TICK_PER_SECOND 1000000000LL // 1 秒

static RT_BUFFER buffer_desc;

void writer_task(void *arg)
{
    int ret;
    RTIME rt_time = rt_timer_read() + RT_TICK_PER_SECOND;

    printf("Writer task: Writing message to buffer\n");
    ret = rt_buffer_write_until(&buffer_desc, MESSAGE, sizeof(MESSAGE), rt_time);
    if (ret < 0) {
        fprintf(stderr, "Failed to rt_buffer_write: %s\n", strerror(-ret));
    } else {
        printf("Writer task: Wrote %d bytes to buffer\n", ret);
    }
}

void reader_task(void *arg)
{
    int ret;
    char msg[BUFFER_SIZE];
    RTIME rt_time = rt_timer_read() + 2*RT_TICK_PER_SECOND;

    printf("Reader task: Reading message from buffer\n");
    ret = rt_buffer_read_until(&buffer_desc, msg, sizeof(msg), rt_time);
    if (ret < 0) {
        fprintf(stderr, "Failed to rt_buffer_read: %s\n", strerror(-ret));
        rt_task_yield();
    } else {
        printf("Reader task: Read %d bytes from buffer: %s\n", ret, msg);
    }
}

int main(int argc, char *argv[])
{
    int ret;
    RT_TASK writer, reader;

    ret = rt_buffer_create(&buffer_desc, "my_buffer", BUFFER_SIZE, B_PRIO);
    if (ret) {
        fprintf(stderr, "Failed to create buffer: %s\n", strerror(-ret));
        return EXIT_FAILURE;
    }

    printf("Buffer created successfully\n");

    ret = rt_task_create(&writer, "writer", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create writer task: %s\n", strerror(-ret));
        goto out_buffer_delete;
    }

    ret = rt_task_create(&reader, "reader", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create reader task: %s\n", strerror(-ret));
        goto out_task_delete;
    }

    rt_task_start(&writer, &writer_task, NULL);
    rt_task_start(&reader, &reader_task, NULL);

    // Wait for tasks to complete
    rt_task_join(&writer);
    rt_task_join(&reader);
    return ret ? EXIT_FAILURE : EXIT_SUCCESS;

out_task_delete:
    rt_task_delete(&writer);
out_buffer_delete:
    rt_buffer_delete(&buffer_desc);

    return ret ? EXIT_FAILURE : EXIT_SUCCESS;
}

rt_buffer_unbind

int rt_buffer_unbind (RT_BUFFER *bf);

解除与IPC缓冲区的绑定。

参数

  • bf 缓冲区描述符。

此例程释放之前对IPC缓冲区的绑定。此调用返回后,描述符将不再有效地引用此对象。

标签

thread-unrestricted

示例代码

c{filename="app.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <alchemy/task.h>
#include <alchemy/buffer.h>

#define MESSAGE "Hello, Sinsegye!"
#define BUFFER_SIZE sizeof(MESSAGE)

static RT_BUFFER buffer_desc;

void writer_task(void *arg)
{
    int ret;
    
    printf("Writer task: Writing message to buffer\n");
    ret = rt_buffer_write(&buffer_desc, MESSAGE, sizeof(MESSAGE), TM_INFINITE);
    if (ret < 0) {
        perror("rt_buffer_write");
    } else {
        printf("Writer task: Wrote %d bytes to buffer\n", ret);
    }
}

void reader_task(void *arg)
{
    int ret;
    char msg[BUFFER_SIZE];
    
    RT_BUFFER buffer_read;

    ret = rt_buffer_bind(&buffer_read,"my_buffer",TM_INFINITE);
    if (ret) {
        fprintf(stderr, "Failed to bind buffer: %s\n", strerror(-ret));
        return ;
    }

    printf("Reader task: Reading message from buffer\n");
    ret = rt_buffer_read(&buffer_read, msg, sizeof(msg), TM_INFINITE);
    if (ret < 0) {
        printf("rt_buffer_read Error: %d\n", ret);
        perror("rt_buffer_read");
    } else {
        printf("Reader task: Read %d bytes from buffer: %s\n", ret, msg);
    }

    ret = rt_buffer_unbind(&buffer_read);
    if (ret) {
        fprintf(stderr, "Failed to unbind buffer: %s\n", strerror(-ret));
        return ;
    }
}

int main(int argc, char *argv[])
{
    int ret;
    RT_TASK writer, reader;

    ret = rt_buffer_create(&buffer_desc, "my_buffer", BUFFER_SIZE, B_PRIO);
    if (ret) {
        fprintf(stderr, "Failed to create buffer: %s\n", strerror(-ret));
        return EXIT_FAILURE;
    }

    printf("Buffer created successfully\n");

    ret = rt_task_create(&writer, "writer", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create writer task: %s\n", strerror(-ret));
        goto out_buffer_delete;
    }

    ret = rt_task_create(&reader, "reader", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create reader task: %s\n", strerror(-ret));
        goto out_task_delete;
    }

    rt_task_start(&writer, &writer_task, NULL);
    rt_task_start(&reader, &reader_task, NULL);

    // Wait for tasks to complete
    rt_task_join(&writer);
    rt_task_join(&reader);
    return ret ? EXIT_FAILURE : EXIT_SUCCESS;

out_task_delete:
    rt_task_delete(&writer);
out_buffer_delete:
    rt_buffer_delete(&buffer_desc);

    return ret ? EXIT_FAILURE : EXIT_SUCCESS;
}

rt_buffer_write

ssize_t rt_buffer_write (RT_BUFFER *bf, const void *ptr, size_t size, RTIME timeout);

Write to an IPC buffer (with relative scalar timeout).

This routine is a variant of rt_buffer_write_timed() accepting a relative timeout specification expressed as a scalar value.

参数

  • bf The buffer descriptor.
  • ptr The address of the message data to be written to the buffer.
  • len The length in bytes of the message data.
  • timeout A delay expressed in clock ticks. Passing TM_INFINITE causes the caller to block indefinitely until enough buffer space is available. Passing TM_NONBLOCK causes the service to return immediately without blocking in case of buffer space shortage.

标签

xthread-nowait, switch-primary

示例代码

c{filename="app.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <alchemy/task.h>
#include <alchemy/buffer.h>

#define MESSAGE "Hello, Sinsegye!"
#define BUFFER_SIZE sizeof(MESSAGE)

static RT_BUFFER buffer_desc;

void writer_task(void *arg)
{
    int ret;
    
    printf("Writer task: Writing message to buffer\n");
    ret = rt_buffer_write(&buffer_desc, MESSAGE, sizeof(MESSAGE), TM_INFINITE);
    if (ret < 0) {
        perror("rt_buffer_write");
    } else {
        printf("Writer task: Wrote %d bytes to buffer\n", ret);
    }
}

void reader_task(void *arg)
{
    int ret;
    char msg[BUFFER_SIZE];
    
    printf("Reader task: Reading message from buffer\n");
    ret = rt_buffer_read(&buffer_desc, msg, sizeof(msg), TM_INFINITE);
    if (ret < 0) {
        printf("rt_buffer_read Error: %d\n", ret);
        perror("rt_buffer_read");
    } else {
        printf("Reader task: Read %d bytes from buffer: %s\n", ret, msg);
    }
}

int main(int argc, char *argv[])
{
    int ret;
    RT_TASK writer, reader;

    ret = rt_buffer_create(&buffer_desc, "my_buffer", BUFFER_SIZE, B_PRIO);
    if (ret) {
        fprintf(stderr, "Failed to create buffer: %s\n", strerror(-ret));
        return EXIT_FAILURE;
    }

    printf("Buffer created successfully\n");

    ret = rt_task_create(&writer, "writer", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create writer task: %s\n", strerror(-ret));
        goto out_buffer_delete;
    }

    ret = rt_task_create(&reader, "reader", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create reader task: %s\n", strerror(-ret));
        goto out_task_delete;
    }

    rt_task_start(&writer, &writer_task, NULL);
    rt_task_start(&reader, &reader_task, NULL);

    // Wait for tasks to complete
    rt_task_join(&writer);
    rt_task_join(&reader);
    return ret ? EXIT_FAILURE : EXIT_SUCCESS;

out_task_delete:
    rt_task_delete(&writer);
out_buffer_delete:
    rt_buffer_delete(&buffer_desc);

    return ret ? EXIT_FAILURE : EXIT_SUCCESS;
}

rt_buffer_write_timed

ssize_t rt_buffer_write_timed (RT_BUFFER *bf, const void *ptr, size_t size, const struct timespec *abs_timeout);

向IPC缓冲区写入。

此例程向指定的缓冲区写入一条消息。如果在入口处没有足够的缓冲区空间来容纳消息,允许调用者阻塞,直到有足够的空间被释放,或者超时,以先到者为准。

参数

  • bf 缓冲区描述符。
  • ptr 要写入缓冲区的消息数据的地址。
  • len 消息数据的字节长度。零是一个有效值,在这种情况下,缓冲区保持不变,并向调用者返回零。
  • abs_timeout 基于 Alchemy 时钟,以秒/纳秒表示的绝对日期,指定等待足够的缓冲区空间可用来容纳消息的时间限制。传递 NULL 会导致调用者无限期阻塞,直到有足够的缓冲区空间可用。传递 { .tv_sec = 0, .tv_nsec = 0 } 会导致服务在缓冲区空间不足时立即返回,而不会阻塞。

返回值

成功时返回写入缓冲区的字节数。否则:

  • 如果在有足够的缓冲区空间可用来容纳消息之前达到 abs_timeout,则返回 -ETIMEDOUT。
  • 如果 abs_timeout 是 { .tv_sec = 0, .tv_nsec = 0 } 并且在入口处没有立即可用的缓冲区空间来容纳消息,则返回 -EWOULDBLOCK。
  • 如果在有足够的缓冲区空间可用来容纳消息之前为当前任务调用了 rt_task_unblock(),则返回 -EINTR。
  • 如果 bf 不是有效的缓冲区描述符,或 len 大于实际的缓冲区长度,则返回 -EINVAL。
  • 如果在调用者等待缓冲区空间时删除了 bf,则返回 -EIDRM。在这种情况下,此服务返回时 bf 不再有效。
  • 如果此服务应阻塞,但未从 xkernel 线程调用,则返回 -EPERM。

标签

xthread-nowait, switch-primary

示例代码

c{filename="app.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <alchemy/task.h>
#include <alchemy/buffer.h>

#define MESSAGE "Hello, Sinsegye!"
#define BUFFER_SIZE sizeof(MESSAGE)

static RT_BUFFER buffer_desc;

void writer_task(void *arg)
{
    int ret;
    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);
    ts.tv_sec += 2;
    
    printf("Writer task: Writing message to buffer\n");
    ret = rt_buffer_write_timed(&buffer_desc, MESSAGE, sizeof(MESSAGE), &ts);
    if (ret < 0) {
        perror("rt_buffer_write");
    } else {
        printf("Writer task: Wrote %d bytes to buffer\n", ret);
    }
}

void reader_task(void *arg)
{
    int ret;
    char msg[BUFFER_SIZE];
    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);
    ts.tv_sec += 2;
    
    printf("Reader task: Reading message from buffer\n");
    ret = rt_buffer_read_timed(&buffer_desc, msg, sizeof(msg), &ts);
    if (ret < 0) {
        printf("rt_buffer_read Error: %d\n", ret);
        perror("rt_buffer_read");
    } else {
        printf("Reader task: Read %d bytes from buffer: %s\n", ret, msg);
    }
}

int main(int argc, char *argv[])
{
    int ret;
    RT_TASK writer, reader;

    ret = rt_buffer_create(&buffer_desc, "my_buffer", BUFFER_SIZE, B_PRIO);
    if (ret) {
        fprintf(stderr, "Failed to create buffer: %s\n", strerror(-ret));
        return EXIT_FAILURE;
    }

    printf("Buffer created successfully\n");

    ret = rt_task_create(&writer, "writer", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create writer task: %s\n", strerror(-ret));
        goto out_buffer_delete;
    }

    ret = rt_task_create(&reader, "reader", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create reader task: %s\n", strerror(-ret));
        goto out_task_delete;
    }

    rt_task_start(&writer, &writer_task, NULL);
    rt_task_start(&reader, &reader_task, NULL);

    // Wait for tasks to complete
    rt_task_join(&writer);
    rt_task_join(&reader);
    return ret ? EXIT_FAILURE : EXIT_SUCCESS;

out_task_delete:
    rt_task_delete(&writer);
out_buffer_delete:
    rt_buffer_delete(&buffer_desc);

    return ret ? EXIT_FAILURE : EXIT_SUCCESS;
}

rt_buffer_write_until

ssize_t rt_buffer_write_until (RT_BUFFER *bf, const void *ptr, size_t size, RTIME timeout);

向IPC缓冲区写入(具有绝对标量超时)。

此例程是 rt_buffer_write_timed() 的一个变体,接受以标量值表示的绝对超时规范。

参数

  • bf 缓冲区描述符。
  • ptr 要写入缓冲区的消息数据的地址。
  • len 消息数据的字节长度。
  • abs_timeout 以时钟滴答表示的绝对日期。传递 TM_INFINITE 会导致调用者无限期阻塞,直到有足够的缓冲区空间可用。传递 TM_NONBLOCK 会导致服务在缓冲区空间不足时立即返回,而不会阻塞。

标签

xthread-nowait, switch-primary

示例代码

c{filename="app.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <alchemy/task.h>
#include <alchemy/buffer.h>
#include <alchemy/timer.h>

#define MESSAGE "Hello, Sinsegye!"
#define BUFFER_SIZE sizeof(MESSAGE)

#define RT_TICK_PER_SECOND 1000000000LL // 1 秒

static RT_BUFFER buffer_desc;

void writer_task(void *arg)
{
    int ret;
    RTIME rt_time = rt_timer_read() + RT_TICK_PER_SECOND;

    printf("Writer task: Writing message to buffer\n");
    ret = rt_buffer_write_until(&buffer_desc, MESSAGE, sizeof(MESSAGE), rt_time);
    if (ret < 0) {
        fprintf(stderr, "Failed to rt_buffer_write: %s\n", strerror(-ret));
    } else {
        printf("Writer task: Wrote %d bytes to buffer\n", ret);
    }
}

void reader_task(void *arg)
{
    int ret;
    char msg[BUFFER_SIZE];
    RTIME rt_time = rt_timer_read() + 2*RT_TICK_PER_SECOND;

    printf("Reader task: Reading message from buffer\n");
    ret = rt_buffer_read_until(&buffer_desc, msg, sizeof(msg), rt_time);
    if (ret < 0) {
        fprintf(stderr, "Failed to rt_buffer_read: %s\n", strerror(-ret));
        rt_task_yield();
    } else {
        printf("Reader task: Read %d bytes from buffer: %s\n", ret, msg);
    }
}

int main(int argc, char *argv[])
{
    int ret;
    RT_TASK writer, reader;

    ret = rt_buffer_create(&buffer_desc, "my_buffer", BUFFER_SIZE, B_PRIO);
    if (ret) {
        fprintf(stderr, "Failed to create buffer: %s\n", strerror(-ret));
        return EXIT_FAILURE;
    }

    printf("Buffer created successfully\n");

    ret = rt_task_create(&writer, "writer", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create writer task: %s\n", strerror(-ret));
        goto out_buffer_delete;
    }

    ret = rt_task_create(&reader, "reader", 0, 50, T_JOINABLE);
    if (ret) {
        fprintf(stderr, "Failed to create reader task: %s\n", strerror(-ret));
        goto out_task_delete;
    }

    rt_task_start(&writer, &writer_task, NULL);
    rt_task_start(&reader, &reader_task, NULL);

    // Wait for tasks to complete
    rt_task_join(&writer);
    rt_task_join(&reader);
    return ret ? EXIT_FAILURE : EXIT_SUCCESS;

out_task_delete:
    rt_task_delete(&writer);
out_buffer_delete:
    rt_buffer_delete(&buffer_desc);

    return ret ? EXIT_FAILURE : EXIT_SUCCESS;
}

最近修改: 2025-09-30