#define P_MINOR_AUTO XNPIPE_MINOR_AUTO
#define P_URGENT XNPIPE_URGENT
Xenomai 与 Linux 域之间的双向通信通道。
消息管道是 Xenomai 线程和普通 Linux 线程之间使用伪设备上的常规文件 I/O 操作的双向通信通道。管道可以以面向消息的方式操作,以便保留消息边界,也可以从实时线程到普通 Linux 线程以字节为导向的流模式操作,以实现最佳吞吐量。
Xenomai 线程使用 rt_pipe_create() 服务打开管道的一端;普通 Linux 线程通过打开 /dev/rtpN 特殊设备之一来完成相同的操作,其中 N 是双方商定的每个管道的次设备号。
此外,通过注册表支持可以使用命名管道,它会自动创建从 /proc/xenomai/registry/rtipc/xddp/ 下的条目到相应特殊设备文件的符号链接。
注意 Alchemy 的消息管道完全基于 RTDM/ipc 驱动程序提供的 XDDP 协议。
编程建议:测试例程中xenomai.c需要通过cobalt编译成实时程序;linux.c使用gcc编译,不能使用cobalt编译。
int rt_pipe_bind (RT_PIPE *pipe, const char *name, RTIME timeout);
绑定到消息管道。
此例程创建一个新的描述符来引用由其符号名称标识的现有消息管道。如果在入口处对象不存在,调用者可能会阻塞,直到创建了给定名称的管道。
参数
返回值
成功时返回零。否则:
标签
xthread-nowait
注意
超时值被解释为 Alchemy 时钟分辨率的倍数(参见 -alchemy-clock-resolution 选项,默认为 1 纳秒)。
xthread-nowait
, switch-primary
示例代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>
#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"
RT_TASK writer_task, reader_task;
int cycle = 5;
void writer_function(void *arg)
{
const char *message = "Hello from Xenomai!";
int ret;
RT_PIPE pipe_writer;
ret = rt_pipe_bind(&pipe_writer, PIPE_NAME, TM_INFINITE);
if(ret < 0){
printf("Writer: Bind pipe error: %d\n",ret);
return;
}
while (cycle) {
ret = rt_pipe_write(&pipe_writer, message, strlen(message) + 1, P_NORMAL);
if (ret < 0) {
printf("Writer: Error writing to pipe: %d\n", ret);
} else {
printf("Writer: Wrote %d bytes to pipe\n", ret);
}
rt_task_sleep(1000000000); // 睡眠1秒
}
rt_pipe_unbind(&pipe_writer);
}
void reader_function(void *arg)
{
char buffer[100];
int ret;
RT_PIPE pipe_reader;
ret = rt_pipe_bind(&pipe_reader, PIPE_NAME, TM_INFINITE);
if(ret < 0){
printf("Reader: Bind pipe error: %d\n",ret);
return;
}
while (cycle) {
ret = rt_pipe_read(&pipe_reader, buffer, sizeof(buffer), TM_INFINITE);
if (ret < 0) {
printf("Reader: Error reading from pipe: %d\n", ret);
} else {
printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
cycle--;
}
}
rt_pipe_unbind(&pipe_reader);
}
int main(int argc, char *argv[])
{
int ret;
RT_PIPE pipe;
// 创建管道
ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
if (ret < 0) {
printf("Error creating pipe: %d\n", ret);
return 1;
}
// 创建写入任务
ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating writer task: %d\n", ret);
return 1;
}
// 创建读取任务
ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating reader task: %d\n", ret);
return 1;
}
// 启动任务
rt_task_start(&writer_task, &writer_function, NULL);
rt_task_start(&reader_task, &reader_function, NULL);
// 等待任务结束
rt_task_join(&writer_task);
rt_task_join(&reader_task);
// 清理
rt_pipe_delete(&pipe);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024
int main() {
int fd;
char buffer[BUFFER_SIZE];
ssize_t bytes_read, bytes_written;
const char *message = "Hello from Linux!";
int i;
// 打开管道设备
fd = open(PIPE_DEVICE, O_RDWR);
if (fd < 0) {
perror("Failed to open pipe device");
return 1;
}
printf("Pipe device opened successfully.\n");
// 读写循环
for (i = 0; i < 5; i++) {
// 从管道读取
bytes_read = read(fd, buffer, BUFFER_SIZE);
if (bytes_read < 0) {
perror("Failed to read from pipe");
} else if (bytes_read == 0) {
printf("No data available to read.\n");
} else {
buffer[bytes_read] = '\0'; // 确保字符串以null结尾
printf("Read from pipe: %s\n", buffer);
}
// 向管道写入
bytes_written = write(fd, message, strlen(message));
if (bytes_written < 0) {
perror("Failed to write to pipe");
} else {
printf("Wrote to pipe: %s\n", message);
}
sleep(1); // 等待1秒
}
// 关闭管道设备
close(fd);
printf("Pipe device closed.\n");
return 0;
}
int rt_pipe_create (RT_PIPE *pipe, const char *name, int minor, size_t poolsize);
创建一个消息管道。
此服务打开一个双向通信通道,用于在 Xenomai 线程和普通 Linux 线程之间交换消息。管道本身保留消息边界,但也可以在从 Xenomai 到 Linux 的字节流模式下使用。
rt_pipe_create() 总是立即返回,即使没有线程打开关联的特殊设备文件。相反,非实时端在尝试打开特殊设备文件时可能会阻塞,直到 Xenomai 线程在同一管道上调用 rt_pipe_create(),除非在 open(2) 系统调用中使用了 O_NONBLOCK。
参数
通过注册表支持命名管道。创建消息管道时传递有效的 name 参数会导致从 /proc/xenomai/registry/rtipc/xddp/name 到关联的特殊设备(即 /dev/rtp*)创建符号链接,因此这些进程不需要知道特定的次设备信息即可打开正确的设备文件。在这种情况下,管道的两端只需同意一个符号名称来引用相同的数据路径,这在使用自适应算法动态选择次设备号(例如传递 P_MINOR_AUTO 作为次设备值)时特别有用。
返回值
成功时返回零。否则:
标签
xthread-only
, mode-unrestricted
, switch-secondary
xthread-nowait
, switch-primary
示例代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>
#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"
RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;
void writer_function(void *arg)
{
const char *message = "Hello from Xenomai!";
int ret;
while (cycle) {
ret = rt_pipe_write(&pipe, message, strlen(message) + 1, P_NORMAL);
if (ret < 0) {
printf("Writer: Error writing to pipe: %d\n", ret);
} else {
printf("Writer: Wrote %d bytes to pipe\n", ret);
}
rt_task_sleep(1000000000); // 睡眠1秒
}
}
void reader_function(void *arg)
{
char buffer[100];
int ret;
while (cycle) {
ret = rt_pipe_read(&pipe, buffer, sizeof(buffer), TM_INFINITE);
if (ret < 0) {
printf("Reader: Error reading from pipe: %d\n", ret);
} else {
printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
cycle--;
}
}
}
int main(int argc, char *argv[])
{
int ret;
// 创建管道
ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
if (ret < 0) {
printf("Error creating pipe: %d\n", ret);
return 1;
}
// 创建写入任务
ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating writer task: %d\n", ret);
return 1;
}
// 创建读取任务
ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating reader task: %d\n", ret);
return 1;
}
// 启动任务
rt_task_start(&writer_task, &writer_function, NULL);
rt_task_start(&reader_task, &reader_function, NULL);
// 等待任务结束
rt_task_join(&writer_task);
rt_task_join(&reader_task);
// 清理
rt_pipe_delete(&pipe);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024
int main() {
int fd;
char buffer[BUFFER_SIZE];
ssize_t bytes_read, bytes_written;
const char *message = "Hello from Linux!";
int i;
// 打开管道设备
fd = open(PIPE_DEVICE, O_RDWR);
if (fd < 0) {
perror("Failed to open pipe device");
return 1;
}
printf("Pipe device opened successfully.\n");
// 读写循环
for (i = 0; i < 5; i++) {
// 从管道读取
bytes_read = read(fd, buffer, BUFFER_SIZE);
if (bytes_read < 0) {
perror("Failed to read from pipe");
} else if (bytes_read == 0) {
printf("No data available to read.\n");
} else {
buffer[bytes_read] = '\0'; // 确保字符串以null结尾
printf("Read from pipe: %s\n", buffer);
}
// 向管道写入
bytes_written = write(fd, message, strlen(message));
if (bytes_written < 0) {
perror("Failed to write to pipe");
} else {
printf("Wrote to pipe: %s\n", message);
}
sleep(1); // 等待1秒
}
// 关闭管道设备
close(fd);
printf("Pipe device closed.\n");
return 0;
}
int rt_pipe_delete (RT_PIPE *pipe);
删除一个消息管道。
此例程删除先前通过调用 rt_pipe_create() 创建的管道对象。所有附加到该管道的资源将自动释放,所有挂起的数据将被刷新。
参数
返回值
成功时返回零。否则:
标签
mode-unrestricted
, switch-secondary
xthread-nowait
, switch-primary
示例代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>
#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"
RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;
void writer_function(void *arg)
{
const char *message = "Hello from Xenomai!";
int ret;
while (cycle) {
ret = rt_pipe_write(&pipe, message, strlen(message) + 1, P_NORMAL);
if (ret < 0) {
printf("Writer: Error writing to pipe: %d\n", ret);
} else {
printf("Writer: Wrote %d bytes to pipe\n", ret);
}
rt_task_sleep(1000000000); // 睡眠1秒
}
}
void reader_function(void *arg)
{
char buffer[100];
int ret;
while (cycle) {
ret = rt_pipe_read(&pipe, buffer, sizeof(buffer), TM_INFINITE);
if (ret < 0) {
printf("Reader: Error reading from pipe: %d\n", ret);
} else {
printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
cycle--;
}
}
}
int main(int argc, char *argv[])
{
int ret;
// 创建管道
ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
if (ret < 0) {
printf("Error creating pipe: %d\n", ret);
return 1;
}
// 创建写入任务
ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating writer task: %d\n", ret);
return 1;
}
// 创建读取任务
ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating reader task: %d\n", ret);
return 1;
}
// 启动任务
rt_task_start(&writer_task, &writer_function, NULL);
rt_task_start(&reader_task, &reader_function, NULL);
// 等待任务结束
rt_task_join(&writer_task);
rt_task_join(&reader_task);
// 清理
rt_pipe_delete(&pipe);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024
int main() {
int fd;
char buffer[BUFFER_SIZE];
ssize_t bytes_read, bytes_written;
const char *message = "Hello from Linux!";
int i;
// 打开管道设备
fd = open(PIPE_DEVICE, O_RDWR);
if (fd < 0) {
perror("Failed to open pipe device");
return 1;
}
printf("Pipe device opened successfully.\n");
// 读写循环
for (i = 0; i < 5; i++) {
// 从管道读取
bytes_read = read(fd, buffer, BUFFER_SIZE);
if (bytes_read < 0) {
perror("Failed to read from pipe");
} else if (bytes_read == 0) {
printf("No data available to read.\n");
} else {
buffer[bytes_read] = '\0'; // 确保字符串以null结尾
printf("Read from pipe: %s\n", buffer);
}
// 向管道写入
bytes_written = write(fd, message, strlen(message));
if (bytes_written < 0) {
perror("Failed to write to pipe");
} else {
printf("Wrote to pipe: %s\n", message);
}
sleep(1); // 等待1秒
}
// 关闭管道设备
close(fd);
printf("Pipe device closed.\n");
return 0;
}
ssize_t rt_pipe_read (RT_PIPE *pipe, void *buf, size_t size, RTIME timeout);
从管道读取(具有相对标量超时)。
该例程是 rt_queue_read_timed() 的变体,接受以标量值表示的相对超时规范。
参数
标签
xthread-nowait
, switch-primary
示例代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>
#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"
RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;
void writer_function(void *arg)
{
const char *message = "Hello from Xenomai!";
int ret;
while (cycle) {
ret = rt_pipe_write(&pipe, message, strlen(message) + 1, P_NORMAL);
if (ret < 0) {
printf("Writer: Error writing to pipe: %d\n", ret);
} else {
printf("Writer: Wrote %d bytes to pipe\n", ret);
}
rt_task_sleep(1000000000); // 睡眠1秒
}
}
void reader_function(void *arg)
{
char buffer[100];
int ret;
while (cycle) {
ret = rt_pipe_read(&pipe, buffer, sizeof(buffer), TM_INFINITE);
if (ret < 0) {
printf("Reader: Error reading from pipe: %d\n", ret);
} else {
printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
cycle--;
}
}
}
int main(int argc, char *argv[])
{
int ret;
// 创建管道
ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
if (ret < 0) {
printf("Error creating pipe: %d\n", ret);
return 1;
}
// 创建写入任务
ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating writer task: %d\n", ret);
return 1;
}
// 创建读取任务
ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating reader task: %d\n", ret);
return 1;
}
// 启动任务
rt_task_start(&writer_task, &writer_function, NULL);
rt_task_start(&reader_task, &reader_function, NULL);
// 等待任务结束
rt_task_join(&writer_task);
rt_task_join(&reader_task);
// 清理
rt_pipe_delete(&pipe);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024
int main() {
int fd;
char buffer[BUFFER_SIZE];
ssize_t bytes_read, bytes_written;
const char *message = "Hello from Linux!";
int i;
// 打开管道设备
fd = open(PIPE_DEVICE, O_RDWR);
if (fd < 0) {
perror("Failed to open pipe device");
return 1;
}
printf("Pipe device opened successfully.\n");
// 读写循环
for (i = 0; i < 5; i++) {
// 从管道读取
bytes_read = read(fd, buffer, BUFFER_SIZE);
if (bytes_read < 0) {
perror("Failed to read from pipe");
} else if (bytes_read == 0) {
printf("No data available to read.\n");
} else {
buffer[bytes_read] = '\0'; // 确保字符串以null结尾
printf("Read from pipe: %s\n", buffer);
}
// 向管道写入
bytes_written = write(fd, message, strlen(message));
if (bytes_written < 0) {
perror("Failed to write to pipe");
} else {
printf("Wrote to pipe: %s\n", message);
}
sleep(1); // 等待1秒
}
// 关闭管道设备
close(fd);
printf("Pipe device closed.\n");
return 0;
}
ssize_t rt_pipe_read_timed (RT_PIPE *pipe, void *buf, size_t size, const struct timespec *abs_timeout);
从管道读取消息。
此服务从给定的管道读取下一个可用的消息。
参数
返回值
成功时返回接收消息的字节数。否则:
标签
xthread-nowait
, switch-primary
示例代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>
#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"
RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;
void writer_function(void *arg)
{
const char *message = "Hello from Xenomai!";
int ret;
while (cycle) {
ret = rt_pipe_write(&pipe, message, strlen(message) + 1, P_NORMAL);
if (ret < 0) {
printf("Writer: Error writing to pipe: %d\n", ret);
} else {
printf("Writer: Wrote %d bytes to pipe\n", ret);
}
rt_task_sleep(1000000000); // 睡眠1秒
}
}
void reader_function(void *arg)
{
char buffer[100];
int ret;
struct timespec ts;
while (cycle) {
clock_gettime(CLOCK_MONOTONIC, &ts);
ts.tv_sec += 2; // 超时时间 2 秒
ret = rt_pipe_read_timed(&pipe, buffer, sizeof(buffer), &ts);
if(ret == -ETIMEDOUT){
printf("Reader: Timeout\n");
}
else if (ret < 0) {
printf("Reader: Error reading from pipe: %d\n", ret);
} else {
printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
cycle--;
}
}
}
int main(int argc, char *argv[])
{
int ret;
// 创建管道
ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
if (ret < 0) {
printf("Error creating pipe: %d\n", ret);
return 1;
}
// 创建写入任务
ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating writer task: %d\n", ret);
return 1;
}
// 创建读取任务
ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating reader task: %d\n", ret);
return 1;
}
// 启动任务
rt_task_start(&writer_task, &writer_function, NULL);
rt_task_start(&reader_task, &reader_function, NULL);
// 等待任务结束
rt_task_join(&writer_task);
rt_task_join(&reader_task);
// 清理
rt_pipe_delete(&pipe);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024
int main() {
int fd;
char buffer[BUFFER_SIZE];
ssize_t bytes_read, bytes_written;
const char *message = "Hello from Linux!";
int i;
// 打开管道设备
fd = open(PIPE_DEVICE, O_RDWR);
if (fd < 0) {
perror("Failed to open pipe device");
return 1;
}
printf("Pipe device opened successfully.\n");
// 读写循环
for (i = 0; i < 5; i++) {
// 从管道读取
bytes_read = read(fd, buffer, BUFFER_SIZE);
if (bytes_read < 0) {
perror("Failed to read from pipe");
} else if (bytes_read == 0) {
printf("No data available to read.\n");
} else {
buffer[bytes_read] = '\0'; // 确保字符串以null结尾
printf("Read from pipe: %s\n", buffer);
}
// 向管道写入
bytes_written = write(fd, message, strlen(message));
if (bytes_written < 0) {
perror("Failed to write to pipe");
} else {
printf("Wrote to pipe: %s\n", message);
}
sleep(1); // 等待1秒
}
// 关闭管道设备
close(fd);
printf("Pipe device closed.\n");
return 0;
}
ssize_t rt_pipe_read_until (RT_PIPE *pipe, void *buf, size_t size, RTIME timeout);
从管道读取(具有绝对标量超时)。
此例程是 rt_queue_read_timed() 的变体,接受以标量值表示的绝对超时规范。
参数
标签
xthread-nowait
, switch-primary
示例代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>
#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"
RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;
void writer_function(void *arg)
{
const char *message = "Hello from Xenomai!";
int ret;
while (cycle) {
ret = rt_pipe_write(&pipe, message, strlen(message) + 1, P_NORMAL);
if (ret < 0) {
printf("Writer: Error writing to pipe: %d\n", ret);
} else {
printf("Writer: Wrote %d bytes to pipe\n", ret);
}
rt_task_sleep(1000000000); // 睡眠1秒
}
}
void reader_function(void *arg)
{
char buffer[100];
int ret;
RTIME timeout;
while (cycle) {
timeout = rt_timer_read() + 2000000000; // 超时时间 2 秒
ret = rt_pipe_read_until(&pipe, buffer, sizeof(buffer), timeout);
if(ret == -ETIMEDOUT){
printf("Reader: Timeout\n");
}
else if (ret < 0) {
printf("Reader: Error reading from pipe: %d\n", ret);
} else {
printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
cycle--;
}
}
}
int main(int argc, char *argv[])
{
int ret;
// 创建管道
ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
if (ret < 0) {
printf("Error creating pipe: %d\n", ret);
return 1;
}
// 创建写入任务
ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating writer task: %d\n", ret);
return 1;
}
// 创建读取任务
ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating reader task: %d\n", ret);
return 1;
}
// 启动任务
rt_task_start(&writer_task, &writer_function, NULL);
rt_task_start(&reader_task, &reader_function, NULL);
// 等待任务结束
rt_task_join(&writer_task);
rt_task_join(&reader_task);
// 清理
rt_pipe_delete(&pipe);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024
int main() {
int fd;
char buffer[BUFFER_SIZE];
ssize_t bytes_read, bytes_written;
const char *message = "Hello from Linux!";
int i;
// 打开管道设备
fd = open(PIPE_DEVICE, O_RDWR);
if (fd < 0) {
perror("Failed to open pipe device");
return 1;
}
printf("Pipe device opened successfully.\n");
// 读写循环
for (i = 0; i < 5; i++) {
// 从管道读取
bytes_read = read(fd, buffer, BUFFER_SIZE);
if (bytes_read < 0) {
perror("Failed to read from pipe");
} else if (bytes_read == 0) {
printf("No data available to read.\n");
} else {
buffer[bytes_read] = '\0'; // 确保字符串以null结尾
printf("Read from pipe: %s\n", buffer);
}
// 向管道写入
bytes_written = write(fd, message, strlen(message));
if (bytes_written < 0) {
perror("Failed to write to pipe");
} else {
printf("Wrote to pipe: %s\n", message);
}
sleep(1); // 等待1秒
}
// 关闭管道设备
close(fd);
printf("Pipe device closed.\n");
return 0;
}
ssize_t rt_pipe_stream (RT_PIPE *pipe, const void *buf, size_t size);
通过管道流式传输字节。
此服务写入一系列字节,以便从关联的特殊设备接收。与 rt_pipe_send() 不同,此服务不保留消息边界。相反,数据会即时填充到内部缓冲区中,并在接收方唤醒时立即消费。
由 rt_pipe_stream() 服务发送的数据缓冲区始终以 FIFO 顺序传输(即 P_NORMAL 模式)。
参数
返回值
成功时返回发送的字节数;此值可能小于 size,具体取决于内部缓冲区中的可用空间。否则:
注意
在任何对等方打开关联的特殊设备之前向管道写入数据是允许的。输出将被缓冲,直到此时,仅受关联缓冲池中的可用内存限制(参见 rt_pipe_create())。
标签
xcontext
, switch-primary
示例代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>
#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"
RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;
void writer_function(void *arg)
{
const char *message = "Hello from Xenomai!";
int ret;
while (cycle) {
ret = rt_pipe_stream(&pipe, message, strlen(message) + 1);
if (ret < 0) {
printf("Writer: Error writing to pipe: %d\n", ret);
} else {
printf("Writer: Wrote %d bytes to pipe\n", ret);
}
rt_task_sleep(1000000000); // 睡眠1秒
}
}
void reader_function(void *arg)
{
char buffer[100];
int ret;
RTIME timeout;
while (cycle) {
timeout = rt_timer_read() + 2000000000; // 超时时间 2 秒
ret = rt_pipe_read_until(&pipe, buffer, sizeof(buffer), timeout);
if(ret == -ETIMEDOUT){
printf("Reader: Timeout\n");
}
else if (ret < 0) {
printf("Reader: Error reading from pipe: %d\n", ret);
} else {
printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
cycle--;
}
}
}
int main(int argc, char *argv[])
{
int ret;
// 创建管道
ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
if (ret < 0) {
printf("Error creating pipe: %d\n", ret);
return 1;
}
// 创建写入任务
ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating writer task: %d\n", ret);
return 1;
}
// 创建读取任务
ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating reader task: %d\n", ret);
return 1;
}
// 启动任务
rt_task_start(&writer_task, &writer_function, NULL);
rt_task_start(&reader_task, &reader_function, NULL);
// 等待任务结束
rt_task_join(&writer_task);
rt_task_join(&reader_task);
// 清理
rt_pipe_delete(&pipe);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024
int main() {
int fd;
char buffer[BUFFER_SIZE];
ssize_t bytes_read, bytes_written;
const char *message = "Hello from Linux!";
int i;
// 打开管道设备
fd = open(PIPE_DEVICE, O_RDWR);
if (fd < 0) {
perror("Failed to open pipe device");
return 1;
}
printf("Pipe device opened successfully.\n");
// 读写循环
for (i = 0; i < 5; i++) {
// 从管道读取
bytes_read = read(fd, buffer, BUFFER_SIZE);
if (bytes_read < 0) {
perror("Failed to read from pipe");
} else if (bytes_read == 0) {
printf("No data available to read.\n");
} else {
buffer[bytes_read] = '\0'; // 确保字符串以null结尾
printf("Read from pipe: %s\n", buffer);
}
// 向管道写入
bytes_written = write(fd, message, strlen(message));
if (bytes_written < 0) {
perror("Failed to write to pipe");
} else {
printf("Wrote to pipe: %s\n", message);
}
sleep(1); // 等待1秒
}
// 关闭管道设备
close(fd);
printf("Pipe device closed.\n");
return 0;
}
int rt_pipe_unbind (RT_PIPE *pipe);
解除消息管道的绑定。
参数
此例程释放先前对消息管道的绑定。此调用返回后,描述符将不再有效,无法引用该对象。
标签
thread-unrestricted
示例代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>
#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"
RT_TASK writer_task, reader_task;
int cycle = 5;
void writer_function(void *arg)
{
const char *message = "Hello from Xenomai!";
int ret;
RT_PIPE pipe_writer;
ret = rt_pipe_bind(&pipe_writer, PIPE_NAME, TM_INFINITE);
if(ret < 0){
printf("Writer: Bind pipe error: %d\n",ret);
return;
}
while (cycle) {
ret = rt_pipe_write(&pipe_writer, message, strlen(message) + 1, P_NORMAL);
if (ret < 0) {
printf("Writer: Error writing to pipe: %d\n", ret);
} else {
printf("Writer: Wrote %d bytes to pipe\n", ret);
}
rt_task_sleep(1000000000); // 睡眠1秒
}
rt_pipe_unbind(&pipe_writer);
}
void reader_function(void *arg)
{
char buffer[100];
int ret;
RT_PIPE pipe_reader;
ret = rt_pipe_bind(&pipe_reader, PIPE_NAME, TM_INFINITE);
if(ret < 0){
printf("Reader: Bind pipe error: %d\n",ret);
return;
}
while (cycle) {
ret = rt_pipe_read(&pipe_reader, buffer, sizeof(buffer), TM_INFINITE);
if (ret < 0) {
printf("Reader: Error reading from pipe: %d\n", ret);
} else {
printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
cycle--;
}
}
rt_pipe_unbind(&pipe_reader);
}
int main(int argc, char *argv[])
{
int ret;
RT_PIPE pipe;
// 创建管道
ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
if (ret < 0) {
printf("Error creating pipe: %d\n", ret);
return 1;
}
// 创建写入任务
ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating writer task: %d\n", ret);
return 1;
}
// 创建读取任务
ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating reader task: %d\n", ret);
return 1;
}
// 启动任务
rt_task_start(&writer_task, &writer_function, NULL);
rt_task_start(&reader_task, &reader_function, NULL);
// 等待任务结束
rt_task_join(&writer_task);
rt_task_join(&reader_task);
// 清理
rt_pipe_delete(&pipe);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024
int main() {
int fd;
char buffer[BUFFER_SIZE];
ssize_t bytes_read, bytes_written;
const char *message = "Hello from Linux!";
int i;
// 打开管道设备
fd = open(PIPE_DEVICE, O_RDWR);
if (fd < 0) {
perror("Failed to open pipe device");
return 1;
}
printf("Pipe device opened successfully.\n");
// 读写循环
for (i = 0; i < 5; i++) {
// 从管道读取
bytes_read = read(fd, buffer, BUFFER_SIZE);
if (bytes_read < 0) {
perror("Failed to read from pipe");
} else if (bytes_read == 0) {
printf("No data available to read.\n");
} else {
buffer[bytes_read] = '\0'; // 确保字符串以null结尾
printf("Read from pipe: %s\n", buffer);
}
// 向管道写入
bytes_written = write(fd, message, strlen(message));
if (bytes_written < 0) {
perror("Failed to write to pipe");
} else {
printf("Wrote to pipe: %s\n", message);
}
sleep(1); // 等待1秒
}
// 关闭管道设备
close(fd);
printf("Pipe device closed.\n");
return 0;
}
ssize_t rt_pipe_write (RT_PIPE *pipe, const void *buf, size_t size, int mode);
向管道写入消息。
此服务写入一条完整的消息,以便从关联的特殊设备接收。rt_pipe_write() 始终保留消息边界,这意味着通过此服务的一次调用发送的所有数据将在特殊设备的单次 read(2) 操作中收集。
此服务与 rt_pipe_send() 的不同之处在于它接受指向要发送的原始数据的指针,而不是预定义的消息缓冲区。
参数
返回值
成功时,此服务返回 size。错误时,返回以下错误代码之一:
注意
在任何对等方打开关联的特殊设备之前向管道写入数据是允许的。输出将被缓冲,直到此时,仅受关联缓冲池中的可用内存限制(参见 rt_pipe_create())。
标签
xcontext
, switch-primary
示例代码
#include <stdio.h>
#include <string.h>
#include <alchemy/task.h>
#include <alchemy/pipe.h>
#define TASK_PRIO 20
#define TASK_MODE T_JOINABLE
#define TASK_STKSZ 0
#define PIPE_SIZE 1024
#define PIPE_NAME "my_pipe"
RT_TASK writer_task, reader_task;
RT_PIPE pipe;
int cycle = 5;
void writer_function(void *arg)
{
const char *message = "Hello from Xenomai!";
int ret;
while (cycle) {
ret = rt_pipe_write(&pipe, message, strlen(message) + 1, P_NORMAL);
if (ret < 0) {
printf("Writer: Error writing to pipe: %d\n", ret);
} else {
printf("Writer: Wrote %d bytes to pipe\n", ret);
}
rt_task_sleep(1000000000); // 睡眠1秒
}
}
void reader_function(void *arg)
{
char buffer[100];
int ret;
while (cycle) {
ret = rt_pipe_read(&pipe, buffer, sizeof(buffer), TM_INFINITE);
if (ret < 0) {
printf("Reader: Error reading from pipe: %d\n", ret);
} else {
printf("Reader: Read %d bytes from pipe: %s\n", ret, buffer);
cycle--;
}
}
}
int main(int argc, char *argv[])
{
int ret;
// 创建管道
ret = rt_pipe_create(&pipe, PIPE_NAME, P_MINOR_AUTO, PIPE_SIZE);
if (ret < 0) {
printf("Error creating pipe: %d\n", ret);
return 1;
}
// 创建写入任务
ret = rt_task_create(&writer_task, "writer", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating writer task: %d\n", ret);
return 1;
}
// 创建读取任务
ret = rt_task_create(&reader_task, "reader", TASK_STKSZ, TASK_PRIO, TASK_MODE);
if (ret < 0) {
printf("Error creating reader task: %d\n", ret);
return 1;
}
// 启动任务
rt_task_start(&writer_task, &writer_function, NULL);
rt_task_start(&reader_task, &reader_function, NULL);
// 等待任务结束
rt_task_join(&writer_task);
rt_task_join(&reader_task);
// 清理
rt_pipe_delete(&pipe);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#define PIPE_DEVICE "/proc/xenomai/registry/rtipc/xddp/my_pipe"
#define BUFFER_SIZE 1024
int main() {
int fd;
char buffer[BUFFER_SIZE];
ssize_t bytes_read, bytes_written;
const char *message = "Hello from Linux!";
int i;
// 打开管道设备
fd = open(PIPE_DEVICE, O_RDWR);
if (fd < 0) {
perror("Failed to open pipe device");
return 1;
}
printf("Pipe device opened successfully.\n");
// 读写循环
for (i = 0; i < 5; i++) {
// 从管道读取
bytes_read = read(fd, buffer, BUFFER_SIZE);
if (bytes_read < 0) {
perror("Failed to read from pipe");
} else if (bytes_read == 0) {
printf("No data available to read.\n");
} else {
buffer[bytes_read] = '\0'; // 确保字符串以null结尾
printf("Read from pipe: %s\n", buffer);
}
// 向管道写入
bytes_written = write(fd, message, strlen(message));
if (bytes_written < 0) {
perror("Failed to write to pipe");
} else {
printf("Wrote to pipe: %s\n", message);
}
sleep(1); // 等待1秒
}
// 关闭管道设备
close(fd);
printf("Pipe device closed.\n");
return 0;
}