菜单

Cobalt/Message pipe services

c 复制代码
#define 	P_MINOR_AUTO   XNPIPE_MINOR_AUTO
#define 	P_URGENT   XNPIPE_URGENT

详细描述

Xenomai 与 Linux 域之间的双向通信通道。

消息管道是 Xenomai 线程和普通 Linux 线程之间使用伪设备上的常规文件 I/O 操作的双向通信通道。管道可以以面向消息的方式操作,以便保留消息边界,也可以从实时线程到普通 Linux 线程以字节为导向的流模式操作,以实现最佳吞吐量。

Xenomai 线程使用 rt_pipe_create() 服务打开管道的一端;普通 Linux 线程通过打开 /dev/rtpN 特殊设备之一来完成相同的操作,其中 N 是双方商定的每个管道的次设备号。

此外,通过注册表支持可以使用命名管道,它会自动创建从 /proc/xenomai/registry/rtipc/xddp/ 下的条目到相应特殊设备文件的符号链接。

注意 Alchemy 的消息管道完全基于 RTDM/ipc 驱动程序提供的 XDDP 协议。

编程建议:测试例程中xenomai.c需要通过cobalt编译成实时程序;linux.c使用gcc编译,不能使用cobalt编译。


函数文档

rt_pipe_bind

int rt_pipe_bind (RT_PIPE *pipe, const char *name, RTIME timeout);

绑定到消息管道。

此例程创建一个新的描述符来引用由其符号名称标识的现有消息管道。如果在入口处对象不存在,调用者可能会阻塞,直到创建了给定名称的管道。

参数

  • pipe 由操作填充的管道描述符的地址。失败时,此内存的内容是未定义的。
  • name 一个有效的以 NULL 结尾的名称,用于标识要绑定的管道。此字符串应与传递给 rt_pipe_create() 的对象名称参数匹配。
  • timeout 等待注册发生的时钟刻度数(见注)。传递 TM_INFINITE 会导致调用者无限期阻塞,直到对象被注册。传递 TM_NONBLOCK 会导致服务在对象在入口处未注册时立即返回,而不等待。

返回值

成功时返回零。否则:

  • 如果在检索完成之前为当前任务调用了 rt_task_unblock(),则返回 -EINTR。
  • 如果 timeout 等于 TM_NONBLOCK 并且在入口处未注册搜索的对象,则返回 -EWOULDBLOCK。
  • 如果无法在指定的时间内检索到对象,则返回 -ETIMEDOUT。
  • 如果此服务应阻塞,但未从 Xenomai 线程调用,则返回 -EPERM。

标签

xthread-nowait

注意

超时值被解释为 Alchemy 时钟分辨率的倍数(参见 -alchemy-clock-resolution 选项,默认为 1 纳秒)。

xthread-nowait, switch-primary

示例代码

c{filename="xenomai.c"} 复制代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>

#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"

RT_TASK writer_task, reader_task;
int cycle = 5;

void writer_function(void *arg)
{
    const char *message = "Hello from Xenomai!";
    int ret;
    RT_PIPE pipe_writer;

    ret = rt_pipe_bind(&pipe_writer, PIPE_NAME, TM_INFINITE);
    if(ret < 0){
        printf("Writer: Bind pipe error: %d\n",ret);
        return;
    }

    while (cycle) {
        ret = rt_pipe_write(&pipe_writer, message, strlen(message) + 1, P_NORMAL);
        if (ret < 0) {
            printf("Writer: Error writing to pipe: %d\n", ret);
        } else {
            printf("Writer: Wrote %d bytes to pipe\n", ret);
        }
        rt_task_sleep(1000000000); // 睡眠1秒
    }

    rt_pipe_unbind(&pipe_writer);
}

void reader_function(void *arg)
{
    char buffer[100];
    int ret;
    RT_PIPE pipe_reader;

    ret = rt_pipe_bind(&pipe_reader, PIPE_NAME, TM_INFINITE);
    if(ret < 0){
        printf("Reader: Bind pipe error: %d\n",ret);
        return;
    }

    while (cycle) {
        ret = rt_pipe_read(&pipe_reader, buffer, sizeof(buffer), TM_INFINITE);
        if (ret < 0) {
            printf("Reader: Error reading from pipe: %d\n", ret);
        } else {
            printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
            cycle--;
        }
    }

    rt_pipe_unbind(&pipe_reader);
}

int main(int argc, char *argv[])
{
    int ret;
    RT_PIPE pipe;
    // 创建管道
    ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
    if (ret < 0) {
        printf("Error creating pipe: %d\n", ret);
        return 1;
    }

    // 创建写入任务
    ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating writer task: %d\n", ret);
        return 1;
    }

    // 创建读取任务
    ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating reader task: %d\n", ret);
        return 1;
    }

    // 启动任务
    rt_task_start(&writer_task, &writer_function, NULL);
    rt_task_start(&reader_task, &reader_function, NULL);

    // 等待任务结束
    rt_task_join(&writer_task);
    rt_task_join(&reader_task);

    // 清理
    rt_pipe_delete(&pipe);

    return 0;
}
c{filename="linux.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>

#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024

int main() {
    int fd;
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read, bytes_written;
    const char *message = "Hello from Linux!";
    int i;

    // 打开管道设备
    fd = open(PIPE_DEVICE, O_RDWR);
    if (fd < 0) {
        perror("Failed to open pipe device");
        return 1;
    }

    printf("Pipe device opened successfully.\n");

    // 读写循环
    for (i = 0; i < 5; i++) {
        // 从管道读取
        bytes_read = read(fd, buffer, BUFFER_SIZE);
        if (bytes_read < 0) {
            perror("Failed to read from pipe");
        } else if (bytes_read == 0) {
            printf("No data available to read.\n");
        } else {
            buffer[bytes_read] = '\0';  // 确保字符串以null结尾
            printf("Read from pipe: %s\n", buffer);
        }

        // 向管道写入
        bytes_written = write(fd, message, strlen(message));
        if (bytes_written < 0) {
            perror("Failed to write to pipe");
        } else {
            printf("Wrote to pipe: %s\n", message);
        }

        sleep(1);  // 等待1秒
    }

    // 关闭管道设备
    close(fd);
    printf("Pipe device closed.\n");

    return 0;
}

rt_pipe_create

int rt_pipe_create (RT_PIPE *pipe, const char *name, int minor, size_t poolsize);

创建一个消息管道。

此服务打开一个双向通信通道,用于在 Xenomai 线程和普通 Linux 线程之间交换消息。管道本身保留消息边界,但也可以在从 Xenomai 到 Linux 的字节流模式下使用。

rt_pipe_create() 总是立即返回,即使没有线程打开关联的特殊设备文件。相反,非实时端在尝试打开特殊设备文件时可能会阻塞,直到 Xenomai 线程在同一管道上调用 rt_pipe_create(),除非在 open(2) 系统调用中使用了 O_NONBLOCK。

参数

  • pipe 管道描述符的地址,成功创建后可用于唯一标识该对象。
  • name 代表管道符号名称的 ASCII 字符串。当非 NULL 和非空时,此字符串的副本用于将创建的管道索引到对象注册表中。

通过注册表支持命名管道。创建消息管道时传递有效的 name 参数会导致从 /proc/xenomai/registry/rtipc/xddp/name 到关联的特殊设备(即 /dev/rtp*)创建符号链接,因此这些进程不需要知道特定的次设备信息即可打开正确的设备文件。在这种情况下,管道的两端只需同意一个符号名称来引用相同的数据路径,这在使用自适应算法动态选择次设备号(例如传递 P_MINOR_AUTO 作为次设备值)时特别有用。

  • minor 与管道关联的设备的次设备号。传递 P_MINOR_AUTO 会导致次设备号自动分配。在这种情况下,将自动从 /proc/xenomai/registry/rtipc/xddp/name 到分配的管道设备条目创建符号链接。有效的次设备号范围是 0 到 CONFIG_XENO_OPT_PIPE_NRDEV-1。
  • poolsize 指定管道的专用缓冲池大小。传递 0 表示所有消息分配都在 Cobalt 核心堆上执行。

返回值

成功时返回零。否则:

  • 如果系统无法从主堆获取内存以创建管道,则返回 -ENOMEM。
  • 如果 minor 不同于 P_MINOR_AUTO 并且不是有效的次设备号,则返回 -ENODEV。
  • 如果名称与已注册的管道冲突,则返回 -EEXIST。
  • 如果次设备号已被打开,则返回 -EBUSY。
  • 如果此服务从无效的上下文调用,例如中断或非 Xenomai 线程,则返回 -EPERM。

标签

xthread-only, mode-unrestricted, switch-secondary

xthread-nowait, switch-primary

示例代码

c{filename="xenomai.c"} 复制代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>

#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"

RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;

void writer_function(void *arg)
{
    const char *message = "Hello from Xenomai!";
    int ret;

    while (cycle) {
        ret = rt_pipe_write(&pipe, message, strlen(message) + 1, P_NORMAL);
        if (ret < 0) {
            printf("Writer: Error writing to pipe: %d\n", ret);
        } else {
            printf("Writer: Wrote %d bytes to pipe\n", ret);
        }
        rt_task_sleep(1000000000); // 睡眠1秒
    }
}

void reader_function(void *arg)
{
    char buffer[100];
    int ret;

    while (cycle) {
        ret = rt_pipe_read(&pipe, buffer, sizeof(buffer), TM_INFINITE);
        if (ret < 0) {
            printf("Reader: Error reading from pipe: %d\n", ret);
        } else {
            printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
            cycle--;
        }
    }
}

int main(int argc, char *argv[])
{
    int ret;

    // 创建管道
    ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
    if (ret < 0) {
        printf("Error creating pipe: %d\n", ret);
        return 1;
    }

    // 创建写入任务
    ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating writer task: %d\n", ret);
        return 1;
    }

    // 创建读取任务
    ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating reader task: %d\n", ret);
        return 1;
    }

    // 启动任务
    rt_task_start(&writer_task, &writer_function, NULL);
    rt_task_start(&reader_task, &reader_function, NULL);

    // 等待任务结束
    rt_task_join(&writer_task);
    rt_task_join(&reader_task);

    // 清理
    rt_pipe_delete(&pipe);

    return 0;
}
c{filename="linux.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>

#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024

int main() {
    int fd;
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read, bytes_written;
    const char *message = "Hello from Linux!";
    int i;

    // 打开管道设备
    fd = open(PIPE_DEVICE, O_RDWR);
    if (fd < 0) {
        perror("Failed to open pipe device");
        return 1;
    }

    printf("Pipe device opened successfully.\n");

    // 读写循环
    for (i = 0; i < 5; i++) {
        // 从管道读取
        bytes_read = read(fd, buffer, BUFFER_SIZE);
        if (bytes_read < 0) {
            perror("Failed to read from pipe");
        } else if (bytes_read == 0) {
            printf("No data available to read.\n");
        } else {
            buffer[bytes_read] = '\0';  // 确保字符串以null结尾
            printf("Read from pipe: %s\n", buffer);
        }

        // 向管道写入
        bytes_written = write(fd, message, strlen(message));
        if (bytes_written < 0) {
            perror("Failed to write to pipe");
        } else {
            printf("Wrote to pipe: %s\n", message);
        }

        sleep(1);  // 等待1秒
    }

    // 关闭管道设备
    close(fd);
    printf("Pipe device closed.\n");

    return 0;
}

rt_pipe_delete

int rt_pipe_delete (RT_PIPE *pipe);

删除一个消息管道。

此例程删除先前通过调用 rt_pipe_create() 创建的管道对象。所有附加到该管道的资源将自动释放,所有挂起的数据将被刷新。

参数

  • pipe 管道描述符。

返回值

成功时返回零。否则:

  • 如果 pipe 不是有效的管道描述符,则返回 -EINVAL。
  • 如果 pipe 是一个已关闭的管道描述符,则返回 -EIDRM。
  • 如果此服务从异步上下文调用,则返回 -EPERM。

标签

mode-unrestricted, switch-secondary

xthread-nowait, switch-primary

示例代码

c{filename="xenomai.c"} 复制代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>

#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"

RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;

void writer_function(void *arg)
{
    const char *message = "Hello from Xenomai!";
    int ret;

    while (cycle) {
        ret = rt_pipe_write(&pipe, message, strlen(message) + 1, P_NORMAL);
        if (ret < 0) {
            printf("Writer: Error writing to pipe: %d\n", ret);
        } else {
            printf("Writer: Wrote %d bytes to pipe\n", ret);
        }
        rt_task_sleep(1000000000); // 睡眠1秒
    }
}

void reader_function(void *arg)
{
    char buffer[100];
    int ret;

    while (cycle) {
        ret = rt_pipe_read(&pipe, buffer, sizeof(buffer), TM_INFINITE);
        if (ret < 0) {
            printf("Reader: Error reading from pipe: %d\n", ret);
        } else {
            printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
            cycle--;
        }
    }
}

int main(int argc, char *argv[])
{
    int ret;

    // 创建管道
    ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
    if (ret < 0) {
        printf("Error creating pipe: %d\n", ret);
        return 1;
    }

    // 创建写入任务
    ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating writer task: %d\n", ret);
        return 1;
    }

    // 创建读取任务
    ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating reader task: %d\n", ret);
        return 1;
    }

    // 启动任务
    rt_task_start(&writer_task, &writer_function, NULL);
    rt_task_start(&reader_task, &reader_function, NULL);

    // 等待任务结束
    rt_task_join(&writer_task);
    rt_task_join(&reader_task);

    // 清理
    rt_pipe_delete(&pipe);

    return 0;
}
c{filename="linux.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>

#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024

int main() {
    int fd;
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read, bytes_written;
    const char *message = "Hello from Linux!";
    int i;

    // 打开管道设备
    fd = open(PIPE_DEVICE, O_RDWR);
    if (fd < 0) {
        perror("Failed to open pipe device");
        return 1;
    }

    printf("Pipe device opened successfully.\n");

    // 读写循环
    for (i = 0; i < 5; i++) {
        // 从管道读取
        bytes_read = read(fd, buffer, BUFFER_SIZE);
        if (bytes_read < 0) {
            perror("Failed to read from pipe");
        } else if (bytes_read == 0) {
            printf("No data available to read.\n");
        } else {
            buffer[bytes_read] = '\0';  // 确保字符串以null结尾
            printf("Read from pipe: %s\n", buffer);
        }

        // 向管道写入
        bytes_written = write(fd, message, strlen(message));
        if (bytes_written < 0) {
            perror("Failed to write to pipe");
        } else {
            printf("Wrote to pipe: %s\n", message);
        }

        sleep(1);  // 等待1秒
    }

    // 关闭管道设备
    close(fd);
    printf("Pipe device closed.\n");

    return 0;
}

rt_pipe_read

ssize_t rt_pipe_read (RT_PIPE *pipe, void *buf, size_t size, RTIME timeout);

从管道读取(具有相对标量超时)。

该例程是 rt_queue_read_timed() 的变体,接受以标量值表示的相对超时规范。

参数

  • pipe 管道描述符。
  • buf 指向内存区域的指针,成功接收消息后该区域将被写入。
  • size 从接收的消息中读取到 buf 的字节数。如果 size 小于实际消息大小,则返回 -ENOBUFS,因为未完全接收的消息将会丢失。如果 size 为零,此调用将立即返回,不执行任何其他操作。
  • timeout 以时钟滴答表示的延迟。传递 TM_INFINITE 会导致调用者无限期阻塞,直到有消息可用。传递 TM_NONBLOCK 会导致服务在没有消息可用的情况下立即返回而不阻塞。

标签

xthread-nowait, switch-primary

示例代码

c{filename="xenomai.c"} 复制代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>

#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"

RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;

void writer_function(void *arg)
{
    const char *message = "Hello from Xenomai!";
    int ret;

    while (cycle) {
        ret = rt_pipe_write(&pipe, message, strlen(message) + 1, P_NORMAL);
        if (ret < 0) {
            printf("Writer: Error writing to pipe: %d\n", ret);
        } else {
            printf("Writer: Wrote %d bytes to pipe\n", ret);
        }
        rt_task_sleep(1000000000); // 睡眠1秒
    }
}

void reader_function(void *arg)
{
    char buffer[100];
    int ret;

    while (cycle) {
        ret = rt_pipe_read(&pipe, buffer, sizeof(buffer), TM_INFINITE);
        if (ret < 0) {
            printf("Reader: Error reading from pipe: %d\n", ret);
        } else {
            printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
            cycle--;
        }
    }
}

int main(int argc, char *argv[])
{
    int ret;

    // 创建管道
    ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
    if (ret < 0) {
        printf("Error creating pipe: %d\n", ret);
        return 1;
    }

    // 创建写入任务
    ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating writer task: %d\n", ret);
        return 1;
    }

    // 创建读取任务
    ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating reader task: %d\n", ret);
        return 1;
    }

    // 启动任务
    rt_task_start(&writer_task, &writer_function, NULL);
    rt_task_start(&reader_task, &reader_function, NULL);

    // 等待任务结束
    rt_task_join(&writer_task);
    rt_task_join(&reader_task);

    // 清理
    rt_pipe_delete(&pipe);

    return 0;
}
c{filename="linux.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>

#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024

int main() {
    int fd;
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read, bytes_written;
    const char *message = "Hello from Linux!";
    int i;

    // 打开管道设备
    fd = open(PIPE_DEVICE, O_RDWR);
    if (fd < 0) {
        perror("Failed to open pipe device");
        return 1;
    }

    printf("Pipe device opened successfully.\n");

    // 读写循环
    for (i = 0; i < 5; i++) {
        // 从管道读取
        bytes_read = read(fd, buffer, BUFFER_SIZE);
        if (bytes_read < 0) {
            perror("Failed to read from pipe");
        } else if (bytes_read == 0) {
            printf("No data available to read.\n");
        } else {
            buffer[bytes_read] = '\0';  // 确保字符串以null结尾
            printf("Read from pipe: %s\n", buffer);
        }

        // 向管道写入
        bytes_written = write(fd, message, strlen(message));
        if (bytes_written < 0) {
            perror("Failed to write to pipe");
        } else {
            printf("Wrote to pipe: %s\n", message);
        }

        sleep(1);  // 等待1秒
    }

    // 关闭管道设备
    close(fd);
    printf("Pipe device closed.\n");

    return 0;
}

rt_pipe_read_timed

ssize_t rt_pipe_read_timed (RT_PIPE *pipe, void *buf, size_t size, const struct timespec *abs_timeout);

从管道读取消息。

此服务从给定的管道读取下一个可用的消息。

参数

  • pipe 管道描述符。
  • buf 一个指向内存区域的指针,成功时将接收到的消息写入该区域。
  • size 要读取到 buf 中的接收消息的字节数。如果 size 小于实际消息大小,则返回 -ENOBUFS,因为不完整接收的消息将丢失。如果 size 为零,此调用立即返回且不执行其他操作。
  • abs_timeout 基于 Alchemy 时钟,以秒/纳秒表示的绝对时间,指定等待消息从管道可用的时间限制。传递 NULL 会导致调用者无限期阻塞,直到消息可用。传递 { .tv_sec = 0, .tv_nsec = 0 } 会导致服务立即返回而不阻塞,如果没有消息可用。

返回值

成功时返回接收消息的字节数。否则:

  • 如果在消息到达之前达到 abs_timeout,则返回 -ETIMEDOUT。
  • 如果 abs_timeout 为 { .tv_sec = 0, .tv_nsec = 0 } 且在调用时没有消息立即可用,则返回 -EWOULDBLOCK。
  • 如果在消息可用之前调用了 rt_task_unblock(),则返回 -EINTR。
  • 如果 pipe 不是有效的管道描述符,则返回 -EINVAL。
  • 如果在调用者等待消息时删除了 pipe,则返回 -EIDRM。在这种情况下,返回此服务时 pipe 不再有效。
  • 如果此服务应阻塞,但不是从 Xenomai 线程调用的,则返回 -EPERM。

标签

xthread-nowait, switch-primary

示例代码

c{filename="xenomai.c"} 复制代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>

#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"

RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;

void writer_function(void *arg)
{
    const char *message = "Hello from Xenomai!";
    int ret;

    while (cycle) {
        ret = rt_pipe_write(&pipe, message, strlen(message) + 1, P_NORMAL);
        if (ret < 0) {
            printf("Writer: Error writing to pipe: %d\n", ret);
        } else {
            printf("Writer: Wrote %d bytes to pipe\n", ret);
        }
        rt_task_sleep(1000000000); // 睡眠1秒
    }
}

void reader_function(void *arg)
{
    char buffer[100];
    int ret;
    struct timespec ts;

    while (cycle) {
        clock_gettime(CLOCK_MONOTONIC, &ts);
        ts.tv_sec += 2;  // 超时时间 2 秒
        ret = rt_pipe_read_timed(&pipe, buffer, sizeof(buffer), &ts);
        if(ret == -ETIMEDOUT){
            printf("Reader: Timeout\n");
        }
        else if (ret < 0) {
            printf("Reader: Error reading from pipe: %d\n", ret);
        } else {
            printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
            cycle--;
        }
    }
}

int main(int argc, char *argv[])
{
    int ret;

    // 创建管道
    ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
    if (ret < 0) {
        printf("Error creating pipe: %d\n", ret);
        return 1;
    }

    // 创建写入任务
    ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating writer task: %d\n", ret);
        return 1;
    }

    // 创建读取任务
    ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating reader task: %d\n", ret);
        return 1;
    }

    // 启动任务
    rt_task_start(&writer_task, &writer_function, NULL);
    rt_task_start(&reader_task, &reader_function, NULL);

    // 等待任务结束
    rt_task_join(&writer_task);
    rt_task_join(&reader_task);

    // 清理
    rt_pipe_delete(&pipe);

    return 0;
}
c{filename="linux.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>

#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024

int main() {
    int fd;
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read, bytes_written;
    const char *message = "Hello from Linux!";
    int i;

    // 打开管道设备
    fd = open(PIPE_DEVICE, O_RDWR);
    if (fd < 0) {
        perror("Failed to open pipe device");
        return 1;
    }

    printf("Pipe device opened successfully.\n");

    // 读写循环
    for (i = 0; i < 5; i++) {
        // 从管道读取
        bytes_read = read(fd, buffer, BUFFER_SIZE);
        if (bytes_read < 0) {
            perror("Failed to read from pipe");
        } else if (bytes_read == 0) {
            printf("No data available to read.\n");
        } else {
            buffer[bytes_read] = '\0';  // 确保字符串以null结尾
            printf("Read from pipe: %s\n", buffer);
        }

        // 向管道写入
        bytes_written = write(fd, message, strlen(message));
        if (bytes_written < 0) {
            perror("Failed to write to pipe");
        } else {
            printf("Wrote to pipe: %s\n", message);
        }

        sleep(1);  // 等待1秒
    }

    // 关闭管道设备
    close(fd);
    printf("Pipe device closed.\n");

    return 0;
}

rt_pipe_read_until

ssize_t rt_pipe_read_until (RT_PIPE *pipe, void *buf, size_t size, RTIME timeout);

从管道读取(具有绝对标量超时)。

此例程是 rt_queue_read_timed() 的变体,接受以标量值表示的绝对超时规范。

参数

  • pipe 管道描述符。
  • buf 指向内存区域的指针,成功接收消息后该区域将被写入。
  • size 从接收的消息中读取到 buf 的字节数。如果 size 小于实际消息大小,则返回 -ENOBUFS,因为未完全接收的消息将会丢失。如果 size 为零,此调用将立即返回,不执行任何其他操作。
  • abs_timeout 以时钟刻度表示的绝对日期。传递 TM_INFINITE 会导致调用者无限期阻塞,直到有消息可用。传递 TM_NONBLOCK 会导致服务在没有消息可用的情况下立即返回而不阻塞。

标签

xthread-nowait, switch-primary

示例代码

c{filename="xenomai.c"} 复制代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>

#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"

RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;

void writer_function(void *arg)
{
    const char *message = "Hello from Xenomai!";
    int ret;

    while (cycle) {
        ret = rt_pipe_write(&pipe, message, strlen(message) + 1, P_NORMAL);
        if (ret < 0) {
            printf("Writer: Error writing to pipe: %d\n", ret);
        } else {
            printf("Writer: Wrote %d bytes to pipe\n", ret);
        }
        rt_task_sleep(1000000000); // 睡眠1秒
    }
}

void reader_function(void *arg)
{
    char buffer[100];
    int ret;
    RTIME timeout;

    while (cycle) {
        timeout = rt_timer_read() + 2000000000;  // 超时时间 2 秒
        ret = rt_pipe_read_until(&pipe, buffer, sizeof(buffer), timeout);
        if(ret == -ETIMEDOUT){
            printf("Reader: Timeout\n");
        }
        else if (ret < 0) {
            printf("Reader: Error reading from pipe: %d\n", ret);
        } else {
            printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
            cycle--;
        }
    }
}

int main(int argc, char *argv[])
{
    int ret;

    // 创建管道
    ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
    if (ret < 0) {
        printf("Error creating pipe: %d\n", ret);
        return 1;
    }

    // 创建写入任务
    ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating writer task: %d\n", ret);
        return 1;
    }

    // 创建读取任务
    ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating reader task: %d\n", ret);
        return 1;
    }

    // 启动任务
    rt_task_start(&writer_task, &writer_function, NULL);
    rt_task_start(&reader_task, &reader_function, NULL);

    // 等待任务结束
    rt_task_join(&writer_task);
    rt_task_join(&reader_task);

    // 清理
    rt_pipe_delete(&pipe);

    return 0;
}
c{filename="linux.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>

#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024

int main() {
    int fd;
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read, bytes_written;
    const char *message = "Hello from Linux!";
    int i;

    // 打开管道设备
    fd = open(PIPE_DEVICE, O_RDWR);
    if (fd < 0) {
        perror("Failed to open pipe device");
        return 1;
    }

    printf("Pipe device opened successfully.\n");

    // 读写循环
    for (i = 0; i < 5; i++) {
        // 从管道读取
        bytes_read = read(fd, buffer, BUFFER_SIZE);
        if (bytes_read < 0) {
            perror("Failed to read from pipe");
        } else if (bytes_read == 0) {
            printf("No data available to read.\n");
        } else {
            buffer[bytes_read] = '\0';  // 确保字符串以null结尾
            printf("Read from pipe: %s\n", buffer);
        }

        // 向管道写入
        bytes_written = write(fd, message, strlen(message));
        if (bytes_written < 0) {
            perror("Failed to write to pipe");
        } else {
            printf("Wrote to pipe: %s\n", message);
        }

        sleep(1);  // 等待1秒
    }

    // 关闭管道设备
    close(fd);
    printf("Pipe device closed.\n");

    return 0;
}

rt_pipe_stream

ssize_t rt_pipe_stream (RT_PIPE *pipe, const void *buf, size_t size);

通过管道流式传输字节。

此服务写入一系列字节,以便从关联的特殊设备接收。与 rt_pipe_send() 不同,此服务不保留消息边界。相反,数据会即时填充到内部缓冲区中,并在接收方唤醒时立即消费。

由 rt_pipe_stream() 服务发送的数据缓冲区始终以 FIFO 顺序传输(即 P_NORMAL 模式)。

参数

  • pipe 管道描述符。
  • buf 要发送的第一个数据字节的地址。数据将在传输前复制到内部缓冲区。
  • size 缓冲区的字节大小。零是一个有效值,在这种情况下,服务会立即返回而不发送任何数据。

返回值

成功时返回发送的字节数;此值可能小于 size,具体取决于内部缓冲区中的可用空间。否则:

  • 如果 mode 无效或 pipe 不是管道描述符,则返回 -EINVAL。
  • 如果没有足够的缓冲区空间来完成操作,则返回 -ENOMEM。
  • 如果 pipe 是一个已关闭的管道描述符,则返回 -EIDRM。

注意

在任何对等方打开关联的特殊设备之前向管道写入数据是允许的。输出将被缓冲,直到此时,仅受关联缓冲池中的可用内存限制(参见 rt_pipe_create())。

标签

xcontext, switch-primary

示例代码

c{filename="xenomai.c"} 复制代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>

#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"

RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;

void writer_function(void *arg)
{
    const char *message = "Hello from Xenomai!";
    int ret;

    while (cycle) {
        ret = rt_pipe_stream(&pipe, message, strlen(message) + 1);
        if (ret < 0) {
            printf("Writer: Error writing to pipe: %d\n", ret);
        } else {
            printf("Writer: Wrote %d bytes to pipe\n", ret);
        }
        rt_task_sleep(1000000000); // 睡眠1秒
    }
}

void reader_function(void *arg)
{
    char buffer[100];
    int ret;
    RTIME timeout;

    while (cycle) {
        timeout = rt_timer_read() + 2000000000;  // 超时时间 2 秒
        ret = rt_pipe_read_until(&pipe, buffer, sizeof(buffer), timeout);
        if(ret == -ETIMEDOUT){
            printf("Reader: Timeout\n");
        }
        else if (ret < 0) {
            printf("Reader: Error reading from pipe: %d\n", ret);
        } else {
            printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
            cycle--;
        }
    }
}

int main(int argc, char *argv[])
{
    int ret;

    // 创建管道
    ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
    if (ret < 0) {
        printf("Error creating pipe: %d\n", ret);
        return 1;
    }

    // 创建写入任务
    ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating writer task: %d\n", ret);
        return 1;
    }

    // 创建读取任务
    ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating reader task: %d\n", ret);
        return 1;
    }

    // 启动任务
    rt_task_start(&writer_task, &writer_function, NULL);
    rt_task_start(&reader_task, &reader_function, NULL);

    // 等待任务结束
    rt_task_join(&writer_task);
    rt_task_join(&reader_task);

    // 清理
    rt_pipe_delete(&pipe);

    return 0;
}
c{filename="linux.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>

#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024

int main() {
    int fd;
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read, bytes_written;
    const char *message = "Hello from Linux!";
    int i;

    // 打开管道设备
    fd = open(PIPE_DEVICE, O_RDWR);
    if (fd < 0) {
        perror("Failed to open pipe device");
        return 1;
    }

    printf("Pipe device opened successfully.\n");

    // 读写循环
    for (i = 0; i < 5; i++) {
        // 从管道读取
        bytes_read = read(fd, buffer, BUFFER_SIZE);
        if (bytes_read < 0) {
            perror("Failed to read from pipe");
        } else if (bytes_read == 0) {
            printf("No data available to read.\n");
        } else {
            buffer[bytes_read] = '\0';  // 确保字符串以null结尾
            printf("Read from pipe: %s\n", buffer);
        }

        // 向管道写入
        bytes_written = write(fd, message, strlen(message));
        if (bytes_written < 0) {
            perror("Failed to write to pipe");
        } else {
            printf("Wrote to pipe: %s\n", message);
        }

        sleep(1);  // 等待1秒
    }

    // 关闭管道设备
    close(fd);
    printf("Pipe device closed.\n");

    return 0;
}

rt_pipe_unbind

int rt_pipe_unbind (RT_PIPE *pipe);

解除消息管道的绑定。

参数

  • pipe 管道描述符。

此例程释放先前对消息管道的绑定。此调用返回后,描述符将不再有效,无法引用该对象。

标签

thread-unrestricted

示例代码

c{filename="xenomai.c"} 复制代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>

#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"

RT_TASK writer_task, reader_task;
int cycle = 5;

void writer_function(void *arg)
{
    const char *message = "Hello from Xenomai!";
    int ret;
    RT_PIPE pipe_writer;

    ret = rt_pipe_bind(&pipe_writer, PIPE_NAME, TM_INFINITE);
    if(ret < 0){
        printf("Writer: Bind pipe error: %d\n",ret);
        return;
    }

    while (cycle) {
        ret = rt_pipe_write(&pipe_writer, message, strlen(message) + 1, P_NORMAL);
        if (ret < 0) {
            printf("Writer: Error writing to pipe: %d\n", ret);
        } else {
            printf("Writer: Wrote %d bytes to pipe\n", ret);
        }
        rt_task_sleep(1000000000); // 睡眠1秒
    }

    rt_pipe_unbind(&pipe_writer);
}

void reader_function(void *arg)
{
    char buffer[100];
    int ret;
    RT_PIPE pipe_reader;

    ret = rt_pipe_bind(&pipe_reader, PIPE_NAME, TM_INFINITE);
    if(ret < 0){
        printf("Reader: Bind pipe error: %d\n",ret);
        return;
    }

    while (cycle) {
        ret = rt_pipe_read(&pipe_reader, buffer, sizeof(buffer), TM_INFINITE);
        if (ret < 0) {
            printf("Reader: Error reading from pipe: %d\n", ret);
        } else {
            printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
            cycle--;
        }
    }

    rt_pipe_unbind(&pipe_reader);
}

int main(int argc, char *argv[])
{
    int ret;
    RT_PIPE pipe;
    // 创建管道
    ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
    if (ret < 0) {
        printf("Error creating pipe: %d\n", ret);
        return 1;
    }

    // 创建写入任务
    ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating writer task: %d\n", ret);
        return 1;
    }

    // 创建读取任务
    ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating reader task: %d\n", ret);
        return 1;
    }

    // 启动任务
    rt_task_start(&writer_task, &writer_function, NULL);
    rt_task_start(&reader_task, &reader_function, NULL);

    // 等待任务结束
    rt_task_join(&writer_task);
    rt_task_join(&reader_task);

    // 清理
    rt_pipe_delete(&pipe);

    return 0;
}
c{filename="linux.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>

#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024

int main() {
    int fd;
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read, bytes_written;
    const char *message = "Hello from Linux!";
    int i;

    // 打开管道设备
    fd = open(PIPE_DEVICE, O_RDWR);
    if (fd < 0) {
        perror("Failed to open pipe device");
        return 1;
    }

    printf("Pipe device opened successfully.\n");

    // 读写循环
    for (i = 0; i < 5; i++) {
        // 从管道读取
        bytes_read = read(fd, buffer, BUFFER_SIZE);
        if (bytes_read < 0) {
            perror("Failed to read from pipe");
        } else if (bytes_read == 0) {
            printf("No data available to read.\n");
        } else {
            buffer[bytes_read] = '\0';  // 确保字符串以null结尾
            printf("Read from pipe: %s\n", buffer);
        }

        // 向管道写入
        bytes_written = write(fd, message, strlen(message));
        if (bytes_written < 0) {
            perror("Failed to write to pipe");
        } else {
            printf("Wrote to pipe: %s\n", message);
        }

        sleep(1);  // 等待1秒
    }

    // 关闭管道设备
    close(fd);
    printf("Pipe device closed.\n");

    return 0;
}

rt_pipe_write

ssize_t rt_pipe_write (RT_PIPE *pipe, const void *buf, size_t size, int mode);

向管道写入消息。

此服务写入一条完整的消息,以便从关联的特殊设备接收。rt_pipe_write() 始终保留消息边界,这意味着通过此服务的一次调用发送的所有数据将在特殊设备的单次 read(2) 操作中收集。

此服务与 rt_pipe_send() 的不同之处在于它接受指向要发送的原始数据的指针,而不是预定义的消息缓冲区。

参数

  • pipe 管道描述符。
  • buf 要发送的第一个数据字节的地址。数据将在传输前复制到内部缓冲区。
  • size 消息的字节大小(仅有效载荷数据)。零是一个有效值,在这种情况下,服务会立即返回而不发送任何消息。
  • mode 影响操作的一组标志:
    • P_URGENT 将消息添加到输出队列的前面,确保 LIFO 顺序。
    • P_NORMAL 将消息添加到输出队列的末尾,确保 FIFO 顺序。

返回值

成功时,此服务返回 size。错误时,返回以下错误代码之一:

  • 如果 mode 无效或 pipe 不是管道描述符,则返回 -EINVAL。
  • 如果没有足够的缓冲区空间来完成操作,则返回 -ENOMEM。
  • 如果 pipe 是一个已关闭的管道描述符,则返回 -EIDRM。

注意

在任何对等方打开关联的特殊设备之前向管道写入数据是允许的。输出将被缓冲,直到此时,仅受关联缓冲池中的可用内存限制(参见 rt_pipe_create())。

标签

xcontext, switch-primary

示例代码

c{filename="xenomai.c"} 复制代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>

#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"

RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;

void writer_function(void *arg)
{
    const char *message = "Hello from Xenomai!";
    int ret;

    while (cycle) {
        ret = rt_pipe_write(&pipe, message, strlen(message) + 1, P_NORMAL);
        if (ret < 0) {
            printf("Writer: Error writing to pipe: %d\n", ret);
        } else {
            printf("Writer: Wrote %d bytes to pipe\n", ret);
        }
        rt_task_sleep(1000000000); // 睡眠1秒
    }
}

void reader_function(void *arg)
{
    char buffer[100];
    int ret;

    while (cycle) {
        ret = rt_pipe_read(&pipe, buffer, sizeof(buffer), TM_INFINITE);
        if (ret < 0) {
            printf("Reader: Error reading from pipe: %d\n", ret);
        } else {
            printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
            cycle--;
        }
    }
}

int main(int argc, char *argv[])
{
    int ret;

    // 创建管道
    ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
    if (ret < 0) {
        printf("Error creating pipe: %d\n", ret);
        return 1;
    }

    // 创建写入任务
    ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating writer task: %d\n", ret);
        return 1;
    }

    // 创建读取任务
    ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
    if (ret < 0) {
        printf("Error creating reader task: %d\n", ret);
        return 1;
    }

    // 启动任务
    rt_task_start(&writer_task, &writer_function, NULL);
    rt_task_start(&reader_task, &reader_function, NULL);

    // 等待任务结束
    rt_task_join(&writer_task);
    rt_task_join(&reader_task);

    // 清理
    rt_pipe_delete(&pipe);

    return 0;
}
c{filename="linux.c"} 复制代码
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>

#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024

int main() {
    int fd;
    char buffer[BUFFER_SIZE];
    ssize_t bytes_read, bytes_written;
    const char *message = "Hello from Linux!";
    int i;

    // 打开管道设备
    fd = open(PIPE_DEVICE, O_RDWR);
    if (fd < 0) {
        perror("Failed to open pipe device");
        return 1;
    }

    printf("Pipe device opened successfully.\n");

    // 读写循环
    for (i = 0; i < 5; i++) {
        // 从管道读取
        bytes_read = read(fd, buffer, BUFFER_SIZE);
        if (bytes_read < 0) {
            perror("Failed to read from pipe");
        } else if (bytes_read == 0) {
            printf("No data available to read.\n");
        } else {
            buffer[bytes_read] = '\0';  // 确保字符串以null结尾
            printf("Read from pipe: %s\n", buffer);
        }

        // 向管道写入
        bytes_written = write(fd, message, strlen(message));
        if (bytes_written < 0) {
            perror("Failed to write to pipe");
        } else {
            printf("Wrote to pipe: %s\n", message);
        }

        sleep(1);  // 等待1秒
    }

    // 关闭管道设备
    close(fd);
    printf("Pipe device closed.\n");

    return 0;
}

最近修改: 2025-07-24