qemu_coroutine_create() 创建一个协程。

qemu_coroutine_enter() 让协程跑起来。

qemu_coroutine_yield() 切换协程。

创建协程。

AioContext 也是基于 GSource 的。

Track the coroutine schedule in QEMU

struct AioContext QEMU

一个 AioContext 应该可以和一个 GMainContext 对应,表示一个事件循环。

struct AioContext {
    // 里面装了很多 fd 来 poll
    GSource source;

    //...
    /*
     * Keep track of readers and writers of the block layer graph.
     * This is essential to avoid performing additions and removal
     * of nodes and edges from block graph while some
     * other thread is traversing it.
     */
    BdrvGraphRWlock *bdrv_graph;

    /* The list of registered AIO handlers.  Protected by ctx->list_lock. */
    AioHandlerList aio_handlers;

    /* The list of AIO handlers to be deleted.  Protected by ctx->list_lock. */
    AioHandlerList deleted_aio_handlers;

    /* Used to avoid unnecessary event_notifier_set calls in aio_notify;
     * only written from the AioContext home thread, or under the BQL in
     * the case of the main AioContext.  However, it is read from any
     * thread so it is still accessed with atomic primitives.
     *
     * If this field is 0, everything (file descriptors, bottom halves,
     * timers) will be re-evaluated before the next blocking poll() or
     * io_uring wait; therefore, the event_notifier_set call can be
     * skipped.  If it is non-zero, you may need to wake up a concurrent
     * aio_poll or the glib main event loop, making event_notifier_set
     * necessary.
     *
     * Bit 0 is reserved for GSource usage of the AioContext, and is 1
     * between a call to aio_ctx_prepare and the next call to aio_ctx_check.
     * Bits 1-31 simply count the number of active calls to aio_poll
     * that are in the prepare or poll phase.
     *
     * The GSource and aio_poll must use a different mechanism because
     * there is no certainty that a call to GSource's prepare callback
     * (via g_main_context_prepare) is indeed followed by check and
     * dispatch.  It's not clear whether this would be a bug, but let's
     * play safe and allow it---it will just cause extra calls to
     * event_notifier_set until the next call to dispatch.
     *
     * Instead, the aio_poll calls include both the prepare and the
     * dispatch phase, hence a simple counter is enough for them.
     */
    uint32_t notify_me;

    /* A lock to protect between QEMUBH and AioHandler adders and deleter,
     * and to ensure that no callbacks are removed while we're walking and
     * dispatching them.
     */
    QemuLockCnt list_lock;

    /* Bottom Halves pending aio_bh_poll() processing */
    BHList bh_list;

    /* Chained BH list slices for each nested aio_bh_poll() call */
    QSIMPLEQ_HEAD(, BHListSlice) bh_slice_list;

    /* Used by aio_notify.
     *
     * "notified" is used to avoid expensive event_notifier_test_and_clear
     * calls.  When it is clear, the EventNotifier is clear, or one thread
     * is going to clear "notified" before processing more events.  False
     * positives are possible, i.e. "notified" could be set even though the
     * EventNotifier is clear.
     *
     * Note that event_notifier_set *cannot* be optimized the same way.  For
     * more information on the problem that would result, see "#ifdef BUG2"
     * in the docs/aio_notify_accept.promela formal model.
     */
    bool notified;
    EventNotifier notifier;

    // 已经 schedule 过的 coroutines
    QSLIST_HEAD(, Coroutine) scheduled_coroutines;
    // ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
    // 可见这个就是函数 co_schedule_bh_cb(),执行这个函数可以
    // 将调度列表里的 coroutine 依次进行调度。
    QEMUBH *co_schedule_bh;

    int thread_pool_min;
    int thread_pool_max;
    /* Thread pool for performing work and receiving completion callbacks.
     * Has its own locking.
     */
    struct ThreadPool *thread_pool;

#ifdef CONFIG_LINUX_AIO
    struct LinuxAioState *linux_aio;
#endif
#ifdef CONFIG_LINUX_IO_URING
    struct LuringState *linux_io_uring;

    /* State for file descriptor monitoring using Linux io_uring */
    struct io_uring fdmon_io_uring;
    AioHandlerSList submit_list;
#endif

    /* TimerLists for calling timers - one per clock type.  Has its own
     * locking.
     */
    QEMUTimerListGroup tlg;

    /* Number of AioHandlers without .io_poll() */
    int poll_disable_cnt;

    /* Polling mode parameters */
    int64_t poll_ns;        /* current polling time in nanoseconds */
    int64_t poll_max_ns;    /* maximum polling time in nanoseconds */
    int64_t poll_grow;      /* polling time growth factor */
    int64_t poll_shrink;    /* polling time shrink factor */

    /* AIO engine parameters */
    int64_t aio_max_batch;  /* maximum number of requests in a batch */

    /*
     * List of handlers participating in userspace polling.  Protected by
     * ctx->list_lock.  Iterated and modified mostly by the event loop thread
     * from aio_poll() with ctx->list_lock incremented.  aio_set_fd_handler()
     * only touches the list to delete nodes if ctx->list_lock's count is zero.
     */
    AioHandlerList poll_aio_handlers;

    /* Are we in polling mode or monitoring file descriptors? */
    bool poll_started;

    /* epoll(7) state used when built with CONFIG_EPOLL */
    int epollfd;

    const FDMonOps *fdmon_ops;
};

QEMU coroutine priority

没有优先级。

ctx->co_schedule_bh / co_schedule_bh_cb() QEMU

ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);

最近加入的 coroutine,最先被调度,这个函数就是重新进入 coroutine 的地方。调度还是 call thread,并不是另一个 thread。

static void co_schedule_bh_cb(void *opaque)
{
    AioContext *ctx = opaque;
    // 
    QSLIST_HEAD(, Coroutine) straight, reversed;

    // scheduled 越最近添加的越在后面。
    // 把 scheduled 中的 coroutines 复制一份到 reversed 里面
    QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
    QSLIST_INIT(&straight);

    // 把 reversed 里的倒过来装到 straight 里面
    while (!QSLIST_EMPTY(&reversed)) {
        Coroutine *co = QSLIST_FIRST(&reversed);
        QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
        QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
    }

    // 可见最近加入的,最先被调度。
    while (!QSLIST_EMPTY(&straight)) {
        Coroutine *co = QSLIST_FIRST(&straight);
        QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);

        // 开始调度...
        aio_context_acquire(ctx);
        /* Protected by write barrier in qemu_aio_coroutine_enter */
        qatomic_set(&co->scheduled, NULL);
        // 进入这个 coroutine
        qemu_aio_coroutine_enter(ctx, co);
        aio_context_release(ctx);
    }
}

aio_co_schedule() QEMU

void aio_co_schedule(AioContext *ctx, Coroutine *co)
{
    trace_aio_co_schedule(ctx, co);
    const char *scheduled = qatomic_cmpxchg(&co->scheduled, NULL, __func__);

    // 查看这个 coroutine 是不是已经被调度了。
    if (scheduled) {
        fprintf(stderr, "%s: Co-routine was already scheduled in '%s'\n", __func__, scheduled);
        abort();
    }

    /* The coroutine might run and release the last ctx reference before we
     * invoke qemu_bh_schedule().  Take a reference to keep ctx alive until
     * we're done.
     */
    aio_context_ref(ctx);

    // ctx->scheduled_coroutines 这个 list 里面包含了所有 scheduled 的 coroutines
    // 把 co 这个协程插入进去表示已经 schedule 了。
    QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines, co, co_scheduled_next);
    // 立即触发这个函数
    qemu_bh_schedule(ctx->co_schedule_bh);

    aio_context_unref(ctx);
}

qemu_bh_schedule() QEMU

这个是把要 schedule 的函数加入到 QEMU main event loop 中去,这样下一次 iteration 的时候就立即触发了。

Runs bottom-halves (BHs), which are like timers that expire immediately. BHs are used to avoid reentrancy and overflowing the call stack. BHs can be added using qemu_bh_schedule().