开源软件社区主要采用两种方法将实时需求引入Linux
Linux内核本身,使其符合实时需求,提供有界延迟、实时API等。主线Linux内核和PREEMPT_RT项目采用这一方法。Linux内核下方添加一层(例如,OS Real-time extension)来处理所有实时需求,从而使Linux的行为不影响实时任务。xkernel项目采用这一方法。这两种方法的目标都是在Linux多CPU的实时和非实时软件执行环境下,实现最低线程调度延迟。

中断可以描述为对硬件事件的立即响应。执行这种响应的过程通常称为中断服务例程(ISR)。在处理 ISR 的过程中,可能会产生多种延迟。这些延迟根据其来源分为两个部分:


用户空间进程 (User-space process):这是通过调用 POSIX 的 fork() 命令创建的,包含以下内容:
用户线程 (User-thread):可以通过 POSIX 的 pthread_create() 命令在现有进程中创建或添加用户线程。
pthread_create() 作为参数的函数。内核线程 (Kernel-thread):可以通过 POSIX 的 kthread_create() 命令在内核模块中创建或添加内核线程。
等时应用程序旨在确保任务在精确定义的时间点完成。然而,Linux 标准定时器通常无法满足所需的循环周期期限的分辨率或精度,甚至两者都不满足。
例如,Linux 中的典型定时器函数(如 gettimeofday() 系统调用)返回微秒级精度的时钟时间,而许多情况下需要的是纳秒级的定时器精度。
为了解决这一限制,创建了提供更高精度计时能力的额外 POSIX API:
PREEMPT_RT 调度上下文中,可以使用 POSIX 的 timer_create() 命令,在给定的时钟域中创建循环任务定时器。该定时器具有以下特点:
POSIX 定时器到期时无法在高精度定时器中断的硬中断上下文中传递信号。根据DIN 44300标准,实时操作被定义为系统能够在严格的时间要求内响应外部事件的能力。在实时系统中,处理数据和响应的时间是预先确定的,这对于确保系统可以及时地对关键事件做出反应非常关键。
这种类型的系统特别适用于那些对时间反应有严格要求的应用,例如自动控制系统、医疗监测设备和交通控制系统。在这些应用中,延迟或失败及时响应可能导致严重后果。
COBALT 任务调度上下文中,可以使用 POSIX 的 clock_nanosleep() 命令在给定的时钟域中创建循环任务定时器。该定时器具有以下特征:
clock_nanosleep() 命令不依赖信号机制,因此不会受到信号机制带来的延迟问题的影响。clock_nanosleep(),以提高效率。先进先出(FIFO)策略,固定优先级,抢占式调度策略。如果你需要关于这一点的学习,你可以查阅这篇背景知识。
当使用SCHED_FIFO时,调度器按优先级顺序扫描所有SCHED_FIFO线程的列表,并调度准备运行的最高优先级线程。
PREEMPT_RT 项目是由 Linux 内核开发者领导的开源框架,遵循 GPLv2 许可证
该项目的目标是逐步提升 Linux 内核对实时性要求的支持,并将这些改进合并到主线内核中。PREEMPT_RT 的开发与主线开发紧密协作,旨在实现更高效的实时性能。
多年来,在 PREEMPT_RT 项目中设计、开发和调试的许多改进现在已经成为主线 Linux 内核的一部分。这个项目是 Linux 内核的一个长期分支,最终目标是在所有改进都合并到主线后使其消失。
PREEMPT_RT 通过在内核代码和众多驱动/模块代码库中推广 No non-threaded IRQ nesting 开发实践,强制执行基本的软件设计规则,以实现完全抢占和低延迟的调度。
APIC、MSI 等)接收到来自硬件的事件,触发中断。tasklet 或工作队列任务的形式启动,且应由 ISR 执行来完成:

Top Half (上半部): 在上半部处理期间,Preempt(抢占)和 IRQ(中断)均为关闭状态,意味着系统不会响应其他中断,也不会进行任务调度。
其他中断处理: 这是一个过渡阶段,处理其他优先级的中断或任务调度,PREEMPT_RT 允许在某些条件下发生抢占,以提高系统的实时响应能力。
Bottom Half (下半部): 下半部负责处理较为耗时的任务,例如处理缓冲区或设备队列。这个阶段的中断和抢占功能是开启的,因此可以允许其他中断和任务的执行。
多线程调度抢占可能发生的情况包括:
多线程调度抢占不能发生的情况包括内核代码关键区(critical section)时:
spinlock 的关键区,除非使用了抢占式自旋锁(preemptive spinlocks)PREEMPT_RT 的优先级调度策略标准的 Linux 内核包含多种调度策略,如 sched 的 manpage 中所描述的。以下三种策略与实时任务相关:
SCHED_FIFO 实现了先进先出的调度算法:
SCHED_FIFO 任务开始运行时,它会一直运行,直到被更高优先级的线程抢占、被 I/O 请求阻塞,或者调用了 yield 函数。SCHED_FIFO 任务释放 CPU 之前都不会被调度。SCHED_FIFO 任务不能互相抢占。SCHED_RR 与 SCHED_FIFO 调度几乎相同,唯一的区别在于它如何处理具有相同优先级的进程:
SCHED_RR 任务分配一个时间片。当进程耗尽它的时间片时,调度器会将它移到该优先级的进程列表末尾。SCHED_RR 任务以轮转方式(Round-Robin)在它们之间调度。SCHED_RR 调度与 SCHED_FIFO 调度是相同的。SCHED_DEADLINE 使用最早截止时间优先(Earliest Deadline First, EDF)的调度算法,并结合了恒定带宽服务器(Constant Bandwidth Server, CBS)的机制:
SCHED_DEADLINE 策略使用三个参数来调度任务:Runtime(运行时间)、Deadline(截止时间)和 Period(周期)。SCHED_DEADLINE 任务在每个周期内会获得 运行时间 的纳秒数,该时间应在周期开始后的 截止时间 纳秒内可用。SCHED_DEADLINE 线程是系统中优先级最高(用户可控)的线程。SCHED_DEADLINE 线程是可运行状态,它将抢占在其他策略下调度的任何线程。优先级继承假设锁(例如,spin_lock、mutex 等)会继承等待该锁的最高优先级进程线程的优先级。
PREEMPT_RT 为 rtmutex、spin_lock 和 mutex 代码提供了优先级继承功能。一个低优先级的进程可能持有一个高优先级进程所需的锁,从而有效地降低了高优先级进程的优先级。
在 Linux 上,chrt 命令可用于设置进程的实时属性,例如策略和优先级:
FIFO,其中 SCHED_FIFO 的优先级值可以在 1 到 99 之间:chrt --fifo --pid <priority> <pid>
        以下命令将为 PID 为 9527 的进程设置调度属性为 SCHED_FIFO,并将优先级设为 99:
chrt --fifo --pid 99 9527
        chrt -rr --pid <priority> <pid>
        以下命令将为 PID 为 9527 的进程设置调度属性为 SCHED_RR,并将优先级设为 99:
chrt -rr --pid 99 1823
        chrt --deadline --sched-runtime <nanoseconds> \
                  --sched-period <nanoseconds> \
                  --sched-deadline <nanoseconds> \
                  --pid <priority> <pid>
        以下示例将为 PID 为 9527 的进程设置调度属性为 SCHED_DEADLINE。运行时间、截止时间和周期的单位为纳秒:
chrt --deadline --sched-runtime 1000000 \
                  --sched-period 5000000 \
                  --sched-deadline 2000000 \
                  --pid 0 9527
        ps f -g 0 -o pid,policy,rtprio,cmd
        将此信息汇总成一个表格:
| 优先级 | 名称 | 
|---|---|
| 99 | posixcputmr, migration | 
| 50 | 所有 IRQ 处理程序,除了 39-s-mmc0 和 42-s-mmc1。例如,367-enp2s0 处理一个网络接口 | 
| 49 | IRQ 处理程序 39-s-mmc0 和 42-s-mmc1 | 
| 1 | i915/signal, ktimersoftd, rcu_preempt, rcu_sched, rcub, rcuc | 
| 0 | 当前运行的其他任务 | 
sched_setscheduler 函数可用于更改线程的调度策略。以下值可用于设置实时调度策略
SCHED_FIFOSCHED_RR{{< callout type="warning" >}}
注意:非实时调度策略如 SCHED_OTHER、SCHED_BATCH 和 SCHED_IDLE 也是可用的。sched_setscheduler 函数不支持 SCHED_DEADLINE 调度策略。
{{< /callout >}}
sched_setscheduler 函数为实时线程策略设置 SCHED_FIFO 或 SCHED_RR 调度策略及其优先级。
int sched_setscheduler(pid_t pid, int policy, const struct sched_param *param);
// 以下代码将配置正在运行的进程,以使用优先级为99的SCHED_RR调度:
struct sched_param param_rr;
memset(¶m_rr, 0, sizeof(param_rr));
param_rr.sched_priority = 99;
pid_t pid = getpid();
if (sched_setscheduler(pid, SCHED_RR, ¶m_rr))
  perror("sched_setscheduler error:");
// 以下代码将配置正在运行的进程,以使用优先级为99的SCHED_FIFO调度:
struct sched_param param_fifo;
memset(¶m_fifo, 0, sizeof(param_fifo));
param_fifo.sched_priority = 99;
pid_t pid = getpid();
if (sched_setscheduler(pid, SCHED_FIFO, ¶m_fifo))
  perror("sched_setscheduler error:");
        要使用FIFO调度创建线程,请使用pthread_attr_init函数初始化pthread_attr_t(线程属性对象)对象:
pthread_attr_t attr_fifo;
pthread_attr_init(&attr_fifo);
        初始化后,使用pthread_attr_setschedpolicy将attr_fifo引用的线程属性对象设置为SCHED_FIFO(FIFO调度策略):
pthread_attr_setschedpolicy(&attr_fifo, SCHED_FIFO);
        使用sched_param对象设置线程的优先级(可以为FIFO调度取1到99之间的值),并使用pthread_attr_setschedparam将参数值复制到线程属性:
struct sched_param param_fifo;
param_fifo.sched_priority = 92;
pthread_attr_setschedparam(&attr_fifo, ¶m_fifo);
        设置线程属性的继承调度器属性。继承调度器属性决定新线程是从调用线程继承调度属性,还是从 attr 继承。要使用 attr 中定义的调度属性,需通过调用 pthread_attr_setinheritsched 函数并使用 PTHREAD_EXPLICIT_SCHED
pthread_attr_setinheritsched(&attr_fifo, PTHREAD_EXPLICIT_SCHED);
        通过调用 pthread_create 函数创建线程:
pthread_t thread_fifo;
pthread_create(&thread_fifo, &attr_fifo, thread_function_fifo, NULL);
        以下代码有助于在 FIFO 调度策略下实现最简单的可抢占多线程应用:
  #include <pthread.h>
  #include <stdio.h>
  void *thread_function_fifo(void *data) {
        printf("Inside Thread\n");
        return NULL;
}
int main(int argc, char* argv[]) {
        struct sched_param param_fifo;
        pthread_attr_t attr_fifo;
        pthread_t thread_fifo;
        int status = -1;
        memset(¶m_fifo, 0, sizeof(param_fifo));
        status = pthread_attr_init(&attr_fifo);
        if (status) {
                printf("pthread_attr_init failed\n");
                return status;
        }
        status = pthread_attr_setschedpolicy(&attr_fifo, SCHED_FIFO);
        if (status) {
                printf("pthread_attr_setschedpolicy failed\n");
                return status;
        }
        param_fifo.sched_priority = 92;
        status = pthread_attr_setschedparam(&attr_fifo, ¶m_fifo);
        if (status) {
                printf("pthread_attr_setschedparam failed\n");
                return status;
        }
        status = pthread_attr_setinheritsched(&attr_fifo, PTHREAD_EXPLICIT_SCHED);
        if (status) {
                printf("pthread_attr_setinheritsched failed\n");
                return status;
        }
        status = pthread_create(&thread_fifo, &attr_fifo, thread_function_fifo, NULL);
        if (status) {
                printf("pthread_create failed\n");
                return status;
        }
        pthread_join(thread_fifo, NULL);
        return status;
}
        Read-Copy Update (RCU) API 在 Linux 代码中被广泛用于在线程同步中避免使用锁。这些 API 的特点如下:
在 RCU 读取端关键区中使用轻量级原语可以保证 RCU 保护的对象指针存在。

所有 RCU 写操作必须等待 RCU 的宽限期(grace period)结束,才能在将某些对象变为不可被读者访问后进行释放,并在此之后回收资源。
spinlock(&updater_lock);
q = cptr;
rcu_assign_pointer(cptr, new_p);
spin_unlock(&updater_lock);
synchronize_rcu(); /* Wait for grace period. */
kfree(q);
        RCU 宽限期是为了让所有已存在的读取器完成它们的 RCU 读取端关键区操作。宽限期从调用 synchronize_rcu() 开始,直到所有 CPU 执行一次上下文切换后结束。

与标准 Linux 运行时相比,Linux 进程的内存管理在 PREEMPT_RT Linux 运行时中被视为一个重要且关键的方面。从内核调度的角度来看,进程和线程没有区别,它们都以类型为 running 的 task_struct 内核结构表示为任务。然而,从调度延迟的角度来看,进程上下文切换的时间显著长于同一进程内的用户线程上下文切换,因为进程切换需要刷新 TLB。

有不同的内存管理算法旨在优化可运行的进程并提高系统性能。例如,如果内核分配的 mmap() 返回的进程需要完整的内存页或仅需要部分内存页,内存管理将与调度器协同工作,以优化资源的使用。
探讨内存管理的三个主要领域:
内存锁定
内存锁定是程序初始化的一部分,尤其在实时进程中非常关键。大多数实时进程会在其执行期间锁定内存。内存锁定 API mlock 和 mlockall 函数可用于应用程序锁定内存,而 munlock 和 munlockall 则用于解锁应用程序的内存页(虚拟地址空间)到主内存中。
MCL_CURRENT、MCL_FUTURE 和 MCL_ONFAULT。内存锁定确保在关键时刻应用程序的内存页不会被从主内存中移除,这也能确保在实时关键操作中不会发生页面错误(page-fault),这一点非常重要。
应用程序中的每个线程都有自己的栈。可以通过 pthread 函数 pthread_attr_setstacksize() 来指定栈的大小。
pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize) 的语法:
如果栈的大小没有显式设置,则会分配默认的栈大小。如果应用程序使用大量的实时线程,建议使用比默认大小更小的栈,以节省资源并优化性能。
在实时(RT)线程执行时,不建议在实时关键路径中进行动态内存分配,因为这会增加发生页面错误的可能性。建议在实时执行开始之前分配所需的内存,并使用 mlock 或 mlockall 函数锁定内存。在以下示例中,线程函数尝试为线程的局部变量动态分配内存,并尝试访问存储在这些随机位置的数据。
在实时系统中,提前分配和锁定内存可以避免在关键操作期间由于内存分页引发的延迟,确保系统的实时性。
#define BUFFER_SIZE 1048576
void *thread_function_fifo(void *data) {
    double sum = 0.0;
    double* tempArray = (double*)calloc(BUFFER_SIZE, sizeof(double));
    size_t randomIndex;
    int i = 50000;
    while(i--)
    {
             randomIndex =  rand() % BUFFER_SIZE;
             sum += tempArray[randomIndex];
    }
             return NULL;
}
        Linux.org 社区逐步改进了定时器的精度,以提供一种更精确的方式唤醒系统并以更准确的时间间隔处理数据:
你可以通过下面的命令查看系统内定时器分辨率:
cat /proc/timer_list | grep 'cpu:\|resolution\|hres_active\|clock\|event_handler'
        等时应用程序会在固定的时间间隔后重复执行:
以下步骤概述了开发一个简单的等时实时线程(isoch-rt-thread)的基本过程,用于进行健全性检查:
这类应用主要用于需要严格时间控制的场景,如音视频处理或工业自动化。
定义一个结构,该结构将包含时间段信息以及时钟的当前时间。此结构将用于在多个任务之间传递数据:
/*Data format to be passed between tasks*/
struct time_period_info {
        struct timespec next_period;
        long period_ns;
        将循环线程的时间周期定义为1毫秒,并获取系统的当前时间:
/*Initialize the periodic task with 1ms time period*/
static void initialize_periodic_task(struct time_period_info *tinfo)
{
        /* keep time period for 1ms */
        tinfo->period_ns = 1000000;
        clock_gettime(CLOCK_MONOTONIC, &(tinfo->next_period));
}
        使用计时器增量模块进行纳米睡眠,以完成真实线程的时间段:
/*Increment the timer until the time period elapses and the Real time task will execute*/
static void inc_period(struct time_period_info *tinfo)
{
      tinfo->next_period.tv_nsec += tinfo->period_ns;
      while(tinfo->next_period.tv_nsec >= 1000000000){
        tinfo->next_period.tv_sec++;
        tinfo->next_period.tv_nsec -=1000000000;
      }
}
        使用循环等待时间段完成。假设与时间段相比,线程执行时间更短:
/*Assumption: Real time task requires less time to complete task as compared to period length, so wait till period completes*/
static void wait_for_period_complete(struct period_info *pinfo)
{
        inc_period(pinfo);
        /* Ignore possibilities of signal wakes */
        clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &pinfo->next_period, NULL);
}
        定义一个实时线程。为了简单起见,包括一个打印声明:
static void real_time_task()
{
        printf("Real-Time Task executing\n");
        return NULL;
}
        初始化并触发实时线程循环执行。这将等待时间段的完成。此线程将作为POSIX线程从主线程创建。
void *realtime_isochronous_task(void *data)
{
        struct time_period_info tpinfo;
        periodic_task_init(&tpinfo);
        while (1) {
                real_time_task();
                wait_for_period_complete(&tpinfo);
        }
        return NULL;
}
        一个非实时的主线程将在这里生成一个实时的等时应用程序线程。此外,它会设置抢占式调度的优先级和策略。
创建一个POSIX主线程,以创建和初始化具有属性的所有线程:
int main(int argc, char* argv[]) {
        struct sched_param param_fifo;
        pthread_attr_t attr_fifo;
        pthread_t thread_fifo;
        int status = -1;
        memset(¶m_fifo, 0, sizeof(param_fifo));
        status = pthread_attr_init(&attr_fifo);
        if (status) {
                printf("pthread_attr_init failed\n");
                return status;
        }
        接下来,使用FIFO调度策略设置实时线程:
status = pthread_attr_setschedpolicy(&attr_fifo, SCHED_FIFO);
if (status) {
  printf("pthread_attr_setschedpolicy failed\n");
  return status;
}
        实时任务优先级设置为92。优先级可以在1到99之间:
param_fifo.sched_priority = 92;
status = pthread_attr_setschedparam(&attr_fifo, ¶m_fifo);
if (status) {
        printf("pthread_attr_setschedparam failed\n");
        return status;
}
        设置线程属性的inherit-scheduler属性。inherit-scheduler属性决定了新线程是从调用线程还是从attr中取调度属性:
status = pthread_attr_setinheritsched(&attr_fifo, PTHREAD_EXPLICIT_SCHED);
if (status) {
        printf("pthread_attr_setinheritsched failed\n");
        return status;
}
        以下代码创建实时等时应用程序线程:
status = pthread_create(&thread_fifo, &attr_fifo, realtime_isochronous_task, NULL);
if (status) {
        printf("pthread_create failed\n");
        return status;
}
        等待实时任务完成:
        pthread_join(thread_fifo, NULL);
    return status;
}
        {{% --- title="点击展开完整示例" closed="true" %}}
/*Header Files*/
#include <pthread.h>
#include <stdio.h>
#include <string.h>
/*Data format to be passed between tasks*/
struct time_period_info {
     struct timespec next_period;
     long period_ns;
};
/*Initialize the periodic task with 1ms time period*/
static void initialize_periodic_task(struct time_period_info *tinfo){
     /*Keep time period for 1ms*/
     tinfo->period_ns = 1000000;
     clock_gettime(CLOCK_MONOTONIC, &(tinfo->next_period));
}
/*Increment the timer to till time period elapsed*/
static void inc_period(struct time_period_info *tinfo){
     tinfo->next_period.tv_nsec += tinfo->period_ns;
     while(tinfo->next_period.tv_nsec >= 1000000000){
             tinfo->next_period.tv_sec++;
             tinfo->next_period.tv_nsec -=1000000000;
     }
}
/*Real time task requires less time to complete task as compared to period length, so wait till period completes*/
static void wait_for_period_complete(struct time_period_info *tinfo){
     inc_period(tinfo);
     clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &tinfo->next_period, NULL);
}
/*Real Time Task*/
static void* real_time_task(){
     printf("Real-Time Task executing\n");
     return NULL;
}
/*Main module for an isochronous application task with Real Time priority and scheduling call as SCHED_FIFO */
void *realtime_isochronous_task(void *data){
     struct time_period_info tinfo;
     initialize_periodic_task(&tinfo);
     while(1){
             real_time_task();
             wait_for_period_complete(&tinfo);
     }
     return NULL;
}
/*Non Real Time master thread that will spawn a Real Time isochronous application thread*/
int main(int argc, char* argv[]) {
     struct sched_param param_fifo;
     pthread_attr_t attr_fifo;
     pthread_t thread_fifo;
     int status = -1;
     memset(¶m_fifo, 0, sizeof(param_fifo));
     status = pthread_attr_init(&attr_fifo);
     if (status) {
             printf("pthread_attr_init failed\n");
             return status;
     }
     status = pthread_attr_setschedpolicy(&attr_fifo, SCHED_FIFO);
     if (status) {
             printf("pthread_attr_setschedpolicy failed\n");
             return status;
     }
     param_fifo.sched_priority = 92;
     status = pthread_attr_setschedparam(&attr_fifo, ¶m_fifo);
     if (status) {
             printf("pthread_attr_setschedparam failed\n");
             return status;
     }
     status = pthread_attr_setinheritsched(&attr_fifo, PTHREAD_EXPLICIT_SCHED);
     if (status) {
             printf("pthread_attr_setinheritsched failed\n");
             return status;
     }
     status = pthread_create(&thread_fifo, &attr_fifo, realtime_isochronous_task, NULL);
     if (status) {
             printf("pthread_create failed\n");
             return status;
     }
     pthread_join(thread_fifo, NULL);
     return status;
}