Coroutine & AioContext in QEMU
qemu_coroutine_create()
创建一个协程。
qemu_coroutine_enter()
让协程跑起来。
qemu_coroutine_yield()
切换协程。
创建协程。
AioContext
也是基于 GSource
的。
Track the coroutine schedule in QEMU
struct AioContext
QEMU
一个 AioContext
应该可以和一个 GMainContext
对应,表示一个事件循环。
struct AioContext {
// 里面装了很多 fd 来 poll
GSource source;
//...
/*
* Keep track of readers and writers of the block layer graph.
* This is essential to avoid performing additions and removal
* of nodes and edges from block graph while some
* other thread is traversing it.
*/
BdrvGraphRWlock *bdrv_graph;
/* The list of registered AIO handlers. Protected by ctx->list_lock. */
AioHandlerList aio_handlers;
/* The list of AIO handlers to be deleted. Protected by ctx->list_lock. */
AioHandlerList deleted_aio_handlers;
/* Used to avoid unnecessary event_notifier_set calls in aio_notify;
* only written from the AioContext home thread, or under the BQL in
* the case of the main AioContext. However, it is read from any
* thread so it is still accessed with atomic primitives.
*
* If this field is 0, everything (file descriptors, bottom halves,
* timers) will be re-evaluated before the next blocking poll() or
* io_uring wait; therefore, the event_notifier_set call can be
* skipped. If it is non-zero, you may need to wake up a concurrent
* aio_poll or the glib main event loop, making event_notifier_set
* necessary.
*
* Bit 0 is reserved for GSource usage of the AioContext, and is 1
* between a call to aio_ctx_prepare and the next call to aio_ctx_check.
* Bits 1-31 simply count the number of active calls to aio_poll
* that are in the prepare or poll phase.
*
* The GSource and aio_poll must use a different mechanism because
* there is no certainty that a call to GSource's prepare callback
* (via g_main_context_prepare) is indeed followed by check and
* dispatch. It's not clear whether this would be a bug, but let's
* play safe and allow it---it will just cause extra calls to
* event_notifier_set until the next call to dispatch.
*
* Instead, the aio_poll calls include both the prepare and the
* dispatch phase, hence a simple counter is enough for them.
*/
uint32_t notify_me;
/* A lock to protect between QEMUBH and AioHandler adders and deleter,
* and to ensure that no callbacks are removed while we're walking and
* dispatching them.
*/
QemuLockCnt list_lock;
/* Bottom Halves pending aio_bh_poll() processing */
BHList bh_list;
/* Chained BH list slices for each nested aio_bh_poll() call */
QSIMPLEQ_HEAD(, BHListSlice) bh_slice_list;
/* Used by aio_notify.
*
* "notified" is used to avoid expensive event_notifier_test_and_clear
* calls. When it is clear, the EventNotifier is clear, or one thread
* is going to clear "notified" before processing more events. False
* positives are possible, i.e. "notified" could be set even though the
* EventNotifier is clear.
*
* Note that event_notifier_set *cannot* be optimized the same way. For
* more information on the problem that would result, see "#ifdef BUG2"
* in the docs/aio_notify_accept.promela formal model.
*/
bool notified;
EventNotifier notifier;
// 已经 schedule 过的 coroutines
QSLIST_HEAD(, Coroutine) scheduled_coroutines;
// ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
// 可见这个就是函数 co_schedule_bh_cb(),执行这个函数可以
// 将调度列表里的 coroutine 依次进行调度。
QEMUBH *co_schedule_bh;
int thread_pool_min;
int thread_pool_max;
/* Thread pool for performing work and receiving completion callbacks.
* Has its own locking.
*/
struct ThreadPool *thread_pool;
#ifdef CONFIG_LINUX_AIO
struct LinuxAioState *linux_aio;
#endif
#ifdef CONFIG_LINUX_IO_URING
struct LuringState *linux_io_uring;
/* State for file descriptor monitoring using Linux io_uring */
struct io_uring fdmon_io_uring;
AioHandlerSList submit_list;
#endif
/* TimerLists for calling timers - one per clock type. Has its own
* locking.
*/
QEMUTimerListGroup tlg;
/* Number of AioHandlers without .io_poll() */
int poll_disable_cnt;
/* Polling mode parameters */
int64_t poll_ns; /* current polling time in nanoseconds */
int64_t poll_max_ns; /* maximum polling time in nanoseconds */
int64_t poll_grow; /* polling time growth factor */
int64_t poll_shrink; /* polling time shrink factor */
/* AIO engine parameters */
int64_t aio_max_batch; /* maximum number of requests in a batch */
/*
* List of handlers participating in userspace polling. Protected by
* ctx->list_lock. Iterated and modified mostly by the event loop thread
* from aio_poll() with ctx->list_lock incremented. aio_set_fd_handler()
* only touches the list to delete nodes if ctx->list_lock's count is zero.
*/
AioHandlerList poll_aio_handlers;
/* Are we in polling mode or monitoring file descriptors? */
bool poll_started;
/* epoll(7) state used when built with CONFIG_EPOLL */
int epollfd;
const FDMonOps *fdmon_ops;
};
QEMU coroutine priority
没有优先级。
ctx->co_schedule_bh
/ co_schedule_bh_cb()
QEMU
ctx->co_schedule_bh = aio_bh_new(ctx, co_schedule_bh_cb, ctx);
最近加入的 coroutine,最先被调度,这个函数就是重新进入 coroutine 的地方。调度还是 call thread,并不是另一个 thread。
static void co_schedule_bh_cb(void *opaque)
{
AioContext *ctx = opaque;
//
QSLIST_HEAD(, Coroutine) straight, reversed;
// scheduled 越最近添加的越在后面。
// 把 scheduled 中的 coroutines 复制一份到 reversed 里面
QSLIST_MOVE_ATOMIC(&reversed, &ctx->scheduled_coroutines);
QSLIST_INIT(&straight);
// 把 reversed 里的倒过来装到 straight 里面
while (!QSLIST_EMPTY(&reversed)) {
Coroutine *co = QSLIST_FIRST(&reversed);
QSLIST_REMOVE_HEAD(&reversed, co_scheduled_next);
QSLIST_INSERT_HEAD(&straight, co, co_scheduled_next);
}
// 可见最近加入的,最先被调度。
while (!QSLIST_EMPTY(&straight)) {
Coroutine *co = QSLIST_FIRST(&straight);
QSLIST_REMOVE_HEAD(&straight, co_scheduled_next);
// 开始调度...
aio_context_acquire(ctx);
/* Protected by write barrier in qemu_aio_coroutine_enter */
qatomic_set(&co->scheduled, NULL);
// 进入这个 coroutine
qemu_aio_coroutine_enter(ctx, co);
aio_context_release(ctx);
}
}
aio_co_schedule()
QEMU
void aio_co_schedule(AioContext *ctx, Coroutine *co)
{
trace_aio_co_schedule(ctx, co);
const char *scheduled = qatomic_cmpxchg(&co->scheduled, NULL, __func__);
// 查看这个 coroutine 是不是已经被调度了。
if (scheduled) {
fprintf(stderr, "%s: Co-routine was already scheduled in '%s'\n", __func__, scheduled);
abort();
}
/* The coroutine might run and release the last ctx reference before we
* invoke qemu_bh_schedule(). Take a reference to keep ctx alive until
* we're done.
*/
aio_context_ref(ctx);
// ctx->scheduled_coroutines 这个 list 里面包含了所有 scheduled 的 coroutines
// 把 co 这个协程插入进去表示已经 schedule 了。
QSLIST_INSERT_HEAD_ATOMIC(&ctx->scheduled_coroutines, co, co_scheduled_next);
// 立即触发这个函数
qemu_bh_schedule(ctx->co_schedule_bh);
aio_context_unref(ctx);
}
qemu_bh_schedule()
QEMU
这个是把要 schedule 的函数加入到 QEMU main event loop 中去,这样下一次 iteration 的时候就立即触发了。
Runs bottom-halves (BHs), which are like timers that expire immediately. BHs are used to avoid reentrancy and overflowing the call stack. BHs can be added using qemu_bh_schedule()
.
🗞️ Recent Posts