【时间子系统】五、高精度定时器

  通过对低精度定时器的分析,我们知道这类定时器的精度是毫秒级的,也就是说存在毫秒级的误差范围。对于像IO超时错误处理这类定时任务,毫秒级的误差完全不算什么问题。然而,对于工业上的许多实时任务,毫秒级的误差是完全不可接受的。因此,基于更高精度的时间硬件(例如TSC和LAPIC Timer),内核工程师们开发了一套全新的高精度定时器功能(传统基于时间轮的低精度定时器已经很稳定了,与其对它修修补补,还不如新建一套全新的机制)。

1. 高精度定时器的初始化

  高精度定时器的初始化和低精度定时器的初始化有些类似,需要指定到期后的回调函数。然而在内部数据结构的设计上,不同于低精度定时器的时间轮,高精度定时器采用了红黑树(可以高效地实现排序、增删改等操作,内核中有比较成熟稳定的代码实现)。另外,低精度定时器的计时参照是jiffies,而高精度定时器可以采用timekeeper中的多种计时参照,如REAL TIME、MONOTONIC TIME等等。代码实现如下:

linux/kernel/hrtimer.c:

/**
 * hrtimer_init - initialize a timer to the given clock
 * @timer:	the timer to be initialized
 * @clock_id:	the clock to be used
 * @mode:	timer mode abs/rel
 */
void hrtimer_init(struct hrtimer *timer, clockid_t clock_id,
    enum hrtimer_mode mode)
{
    debug_init(timer, clock_id, mode);
    __hrtimer_init(timer, clock_id, mode);
}
linux/include/linux/hrtimer.h:

/**
 * struct hrtimer - the basic hrtimer structure
 * @node:   timerqueue node, which also manages node.expires,
 *          the absolute expiry time in the hrtimers internal
 *          representation. The time is related to the clock on
 *          which the timer is based. Is setup by adding
 *          slack to the _softexpires value. For non range timers
 *          identical to _softexpires.
 * @_softexpires: the absolute earliest expiry time of the hrtimer.
 *          The time which was given as expiry time when the timer
 *          was armed.
 * @function:   timer expiry callback function
 * @base:   pointer to the timer base (per cpu and per clock)
 * @state:  state information (See bit values above)
 *
 * The hrtimer structure must be initialized by hrtimer_init()
 */
struct hrtimer {
    struct timerqueue_node      node;
    ktime_t                     _softexpires;
    enum hrtimer_restart        (*function)(struct hrtimer *);
    struct hrtimer_clock_base   *base;
    unsigned long               state;
    ...
};

enum hrtimer_mode {
    HRTIMER_MODE_ABS = 0x0,		/* Time value is absolute */
    HRTIMER_MODE_REL = 0x1,		/* Time value is relative to now */
    HRTIMER_MODE_PINNED = 0x02,	/* Timer is bound to CPU */
    HRTIMER_MODE_ABS_PINNED = 0x02,
    HRTIMER_MODE_REL_PINNED = 0x03,
};

  通过上面的代码和注释,我们可以看到,高精度定时器初始化时可以指定计时参照对象(clock_id)和计时模式(采用绝对计时或相对计时)。高精度定时器内部结构中的node即是在红黑树中的挂接对象,base指向每个CPU针对不同计时参照对象的全局数据结构,其内部包含一棵红黑树。__hrtimer_init的具体实现比较简单:

linux/kernel/hrtimer.c:

static void __hrtimer_init(struct hrtimer *timer, clockid_t clock_id,
    enum hrtimer_mode mode)
{
    struct hrtimer_cpu_base *cpu_base;
    int base;

    memset(timer, 0, sizeof(struct hrtimer));

    cpu_base = &__raw_get_cpu_var(hrtimer_bases); /*获取当前CPU的hrtimer_cpu_base对象*/

    if (clock_id == CLOCK_REALTIME && mode != HRTIMER_MODE_ABS) /*REALTIME只支持绝对模式*/
        clock_id = CLOCK_MONOTONIC;

    base = hrtimer_clockid_to_base(clock_id); /*索引计时参照*/
    timer->base = &cpu_base->clock_base[base];
    timerqueue_init(&timer->node); /*初始化红黑树节点*/

    ...
}

2. 高精度定时器的启动

  初始化完成并指定回调处理函数后,我们通过hrtimer_start函数可以启动一个定时器:

linux/kernel/hrtimer.c:

/**
 * hrtimer_start - (re)start an hrtimer on the current CPU
 * @timer:  the timer to be added
 * @tim:    expiry time
 * @mode:   expiry mode: absolute (HRTIMER_MODE_ABS) or
 *          relative (HRTIMER_MODE_REL)
 *
 * Returns:
 *  0 on success
 *  1 when the timer was active
 */
int hrtimer_start(struct hrtimer *timer, ktime_t tim, const enum hrtimer_mode mode)
{
    return __hrtimer_start_range_ns(timer, tim, 0, mode, 1);
}

  高精度定时器允许有一个纳秒级别的误差,由__hrtimer_start_range_ns的delta_ns参数指明:

linux/kernel/hrtimer.c:

int __hrtimer_start_range_ns(struct hrtimer *timer, ktime_t tim,
    unsigned long delta_ns, const enum hrtimer_mode mode, int wakeup)
{
    struct hrtimer_clock_base *base, *new_base;
    unsigned long flags;
    int ret, leftmost;

    base = lock_hrtimer_base(timer, &flags); /*锁定该timer对应的hrtimer_clock_base对象*/

    /* Remove an active timer from the queue: */
    ret = remove_hrtimer(timer, base);

    if (mode & HRTIMER_MODE_REL) {
        tim = ktime_add_safe(tim, base->get_time());
        ...
    }

    hrtimer_set_expires_range_ns(timer, tim, delta_ns); /*设置定时器内部的超时时间*/

    /* Switch the timer base, if necessary: */
    new_base = switch_hrtimer_base(timer, base, mode & HRTIMER_MODE_PINNED);

    leftmost = enqueue_hrtimer(timer, new_base); /*将定时器加入到对应hrtimer_clock_base的红黑树中*/

    if (leftmost && new_base->cpu_base == &__get_cpu_var(hrtimer_bases)
        && hrtimer_enqueue_reprogram(timer, new_base)) { /*如果当前定时器是红黑树中最早到期的定时器,则重新设置clock event device的oneshot计数。注,高精度定时器正常工作时,会将clock event device的工作模式切换到oneshot*/
        ...
    }

    unlock_hrtimer_base(timer, &flags);

    return ret;
}

3. 切换到高精度模式

  内核正常启动后首先工作在低精度模式,然而在时钟中断的处理中,内核会检测是否具备切换到高精度的条件,如果各条件均满足,则切换到高精度模式工作。时钟中断中在处理低精度时钟时,通过hrtimer_run_pending()完成切换动作:

linux/kernel/timer.c:

static void run_timer_softirq(struct softirq_action *h)
{
    struct tvec_base *base = __this_cpu_read(tvec_bases);

    hrtimer_run_pending();

    if (time_after_eq(jiffies, base->timer_jiffies))
        __run_timers(base);
}

void hrtimer_run_pending(void)
{
    if (hrtimer_hres_active()) /*如果已经切换到高精度模式则返回*/
        return;

    if (tick_check_oneshot_change(!hrtimer_is_hres_enabled())) /*判断是否具备切换到高精度的条件,如时钟源精度是否满足、是否支持oneshot模式*/
        hrtimer_switch_to_hres();
}

  如果切换条件均满足,则通过hrtimer_switch_to_hres切换到高精度模式:

linux/kernel/hrtimer.c:

static int hrtimer_switch_to_hres(void)
{
    int i, cpu = smp_processor_id();
    struct hrtimer_cpu_base *base = &per_cpu(hrtimer_bases, cpu);
    unsigned long flags;

    if (base->hres_active)
        return 1;

    local_irq_save(flags);

    if (tick_init_highres()) { /*将tick模式切换到oneshot模式并重新指定中断处理函数*/
        local_irq_restore(flags);
        printk(KERN_WARNING "Could not switch to high resolution "
        "mode on CPU %d\n", cpu);
        return 0;
    }
    base->hres_active = 1;
    for (i = 0; i < HRTIMER_MAX_CLOCK_BASES; i++)
    base->clock_base[i].resolution = KTIME_HIGH_RES;

    tick_setup_sched_timer(); /*设置一个专门的调度定时器,用来处理调度任务*/
    /* "Retrigger" the interrupt to get things going */
    retrigger_next_event(NULL);
    local_irq_restore(flags);
    return 1;
}
linux/kernel/time/tick-oneshot.c:

/**
 * tick_init_highres - switch to high resolution mode
 *
 * Called with interrupts disabled.
 */
int tick_init_highres(void)
{
    return tick_switch_to_oneshot(hrtimer_interrupt); /*高精度模式下时钟中断处理函数为hrtimer_interrupt*/
}

/**
 * tick_switch_to_oneshot - switch to oneshot mode
 */
int tick_switch_to_oneshot(void (*handler)(struct clock_event_device *))
{
    struct tick_device *td = &__get_cpu_var(tick_cpu_device);
    struct clock_event_device *dev = td->evtdev;

    if (!dev || !(dev->features & CLOCK_EVT_FEAT_ONESHOT) ||
        !tick_device_is_functional(dev)) {
        ...
    }

    td->mode = TICKDEV_MODE_ONESHOT;
    dev->event_handler = handler;
    clockevents_set_mode(dev, CLOCK_EVT_MODE_ONESHOT);
    tick_broadcast_switch_to_oneshot();
    return 0;
}

4. 高精度定时器的到期处理

  如前所述,高精度模式下,时钟中断的处理函数已经从tick_handle_periodic切换成hrtimer_interrupt了:

linux/kernel/hrtimer.c:

/*
 * High resolution timer interrupt
 * Called with interrupts disabled
 */
void hrtimer_interrupt(struct clock_event_device *dev)
{
    struct hrtimer_cpu_base *cpu_base = &__get_cpu_var(hrtimer_bases);
    ktime_t expires_next, now, entry_time, delta;
    int i, retries = 0;

    BUG_ON(!cpu_base->hres_active);
    cpu_base->nr_events++;
    dev->next_event.tv64 = KTIME_MAX;

    raw_spin_lock(&cpu_base->lock);
    entry_time = now = hrtimer_update_base(cpu_base); /*通过时钟源更新当前系统时间*/
retry:
    expires_next.tv64 = KTIME_MAX;
    /*
     * We set expires_next to KTIME_MAX here with cpu_base->lock
     * held to prevent that a timer is enqueued in our queue via
     * the migration code. This does not affect enqueueing of
     * timers which run their callback and need to be requeued on
     * this CPU.
     */
    cpu_base->expires_next.tv64 = KTIME_MAX;

    for (i = 0; i < HRTIMER_MAX_CLOCK_BASES; i++) { /*针对不同的计时参照对象依次处理*/
        struct hrtimer_clock_base *base;
        struct timerqueue_node *node;
        ktime_t basenow;

        if (!(cpu_base->active_bases & (1 << i)))
            continue;

        base = cpu_base->clock_base + i;
        basenow = ktime_add(now, base->offset);

        while ((node = timerqueue_getnext(&base->active))) { /*根据到期时间依次处理红黑树中的定时器*/
            struct hrtimer *timer;

            timer = container_of(node, struct hrtimer, node);

            /*
             * The immediate goal for using the softexpires is
             * minimizing wakeups, not running timers at the
             * earliest interrupt after their soft expiration.
             * This allows us to avoid using a Priority Search
             * Tree, which can answer a stabbing querry for
             * overlapping intervals and instead use the simple
             * BST we already have.
             * We don't add extra wakeups by delaying timers that
             * are right-of a not yet expired timer, because that
             * timer will have to trigger a wakeup anyway.
             */

            if (basenow.tv64 < hrtimer_get_softexpires_tv64(timer)) {
                /*未到期则退出while循环*/

                ktime_t expires;

                expires = ktime_sub(hrtimer_get_expires(timer), base->offset);
                if (expires.tv64 < 0)
                    expires.tv64 = KTIME_MAX;
                if (expires.tv64 < expires_next.tv64)
                    expires_next = expires;
                break;
            }

            __run_hrtimer(timer, &basenow); /*调用到期回调函数*/
        } /*end of while*/
    } /*end of for*/

    /*
     * Store the new expiry value so the migration code can verify
     * against it.
     */
    cpu_base->expires_next = expires_next;
    raw_spin_unlock(&cpu_base->lock);

    /*下面重新设置clock event device的中断触发时间,如果成功则返回*/
    /* Reprogramming necessary ? */
    if (expires_next.tv64 == KTIME_MAX ||
            !tick_program_event(expires_next, 0)) {
        cpu_base->hang_detected = 0;
        return;
    }

    /*执行到此,后续的逻辑是处理一种特殊的场景,即定时器到期回调函数执行时间过长导致下一个定时器又到期了*/

    /*
     * The next timer was already expired due to:
     * - tracing
     * - long lasting callbacks
     * - being scheduled away when running in a VM
     *
     * We need to prevent that we loop forever in the hrtimer
     * interrupt routine. We give it 3 attempts to avoid
     * overreacting on some spurious event.
     *
     * Acquire base lock for updating the offsets and retrieving
     * the current time.
     */
    raw_spin_lock(&cpu_base->lock);
    now = hrtimer_update_base(cpu_base);
    cpu_base->nr_retries++;
    if (++retries < 3)
        goto retry;
    /*
     * Give the system a chance to do something else than looping
     * here. We stored the entry time, so we know exactly how long
     * we spent here. We schedule the next event this amount of
     * time away.
     */
    cpu_base->nr_hangs++;
    cpu_base->hang_detected = 1;
    raw_spin_unlock(&cpu_base->lock);
    delta = ktime_sub(now, entry_time);
    if (delta.tv64 > cpu_base->max_hang_time.tv64)
        cpu_base->max_hang_time = delta;
    /*
     * Limit it to a sensible value as we enforce a longer
     * delay. Give the CPU at least 100ms to catch up.
     */
    if (delta.tv64 > 100 * NSEC_PER_MSEC)
        expires_next = ktime_add_ns(now, 100 * NSEC_PER_MSEC);
    else
        expires_next = ktime_add(now, delta);
    tick_program_event(expires_next, 1);
    printk_once(KERN_WARNING "hrtimer: interrupt took %llu ns\n", ktime_to_ns(delta));
}

  通过分析高精度模式下的时钟中断处理函数,我们可以发现它只负责处理定时器的到期处理。那么低精调模式下的进程调度的处理逻辑去哪里了?不需要了吗?其实,在前文代码中我们看到,高精度模式下内核会给每个CPU生成一个调度定时器:

linux/kernel/time/tick-sched.c:

/**
 * tick_setup_sched_timer - setup the tick emulation timer
 */
void tick_setup_sched_timer(void)
{
    struct tick_sched *ts = &__get_cpu_var(tick_cpu_sched);
    ktime_t now = ktime_get();

    /*
     * Emulate tick processing via per-CPU hrtimers:
     */
    hrtimer_init(&ts->sched_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
    ts->sched_timer.function = tick_sched_timer; /*调度定时器的回调函数*/

    /* Get the next period (per cpu) */
    hrtimer_set_expires(&ts->sched_timer, tick_init_jiffy_update());

    /* Offset the tick to avert jiffies_lock contention. */
    if (sched_skew_tick) {
        u64 offset = ktime_to_ns(tick_period) >> 1;
        do_div(offset, num_possible_cpus());
        offset *= smp_processor_id();
        hrtimer_add_expires_ns(&ts->sched_timer, offset);
    }

    for (;;) {
        hrtimer_forward(&ts->sched_timer, now, tick_period);
        hrtimer_start_expires(&ts->sched_timer, HRTIMER_MODE_ABS_PINNED);
        /* Check, if the timer was already in the past */
        if (hrtimer_active(&ts->sched_timer))
            break;
        now = ktime_get();
    }
    ...
}

/*
 * We rearm the timer until we get disabled by the idle code.
 * Called with interrupts disabled.
 */
static enum hrtimer_restart tick_sched_timer(struct hrtimer *timer)
{
    struct tick_sched *ts =
        container_of(timer, struct tick_sched, sched_timer);
    struct pt_regs *regs = get_irq_regs();
    ktime_t now = ktime_get();

    tick_sched_do_timer(now);

    /*
     * Do not call, when we are not in irq context and have
     * no valid regs pointer
     */
    if (regs)
    tick_sched_handle(ts, regs);

    hrtimer_forward(timer, now, tick_period);

    return HRTIMER_RESTART;
}

static void tick_sched_do_timer(ktime_t now)
{
    int cpu = smp_processor_id();

    ...
    /* Check, if the jiffies need an update */
    if (tick_do_timer_cpu == cpu)
        tick_do_update_jiffies64(now);
}

static void tick_sched_handle(struct tick_sched *ts, struct pt_regs *regs)
{
    ...
    update_process_times(user_mode(regs));
    profile_tick(CPU_PROFILING);
}

  由此可见,调度定时器按tick_period周期性触发(暂不考虑动态时钟nohz特性),每次到期后和处理逻辑和低精度模式下的逻辑类似。


转载请注明:吴斌的博客 » 【时间子系统】五、高精度定时器