The files, sockets or fd’s that carry the migration stream are abstracted by the QEMUFile type. It also has other properties for auxiliary data.

Each QEMUFile is corresponding to a QIOChannel: though this, only the first channel is bound to the QEMUFile's QIOChannel field (is this the master channel?), other channels WON'T be attached to a QEMUFile, poor.

// same for qemu_file_new_output/qemu_file_get_return_path
QEMUFile *qemu_file_new_input(QIOChannel *ioc) {
    return qemu_file_new_impl(ioc, false); // f->ioc = ioc;
}
struct QEMUFile {
    const QEMUFileHooks *hooks;
    QIOChannel *ioc; // QIOChannelTLS、QIOChannelFile、QIOChannelSocket... The first channel
    bool is_writable;

    // Maximum amount of data in bytes to transfer during one rate limiting time window
    int64_t rate_limit_max;

    // Total amount of data in bytes queued for transfer during this rate limiting time window
    int64_t rate_limit_used;

    /* The sum of bytes transferred on the wire */
    int64_t total_transferred;

    // buf_index 和 buf_size 都不是地址,而是 offset。
    // buf_size - buf_index 表示此 buffer 剩余的空间。buf + buf_index 表示要从哪里开始读。
    // 相当于 buf 和 buf_size 是固定的点,而 buf_index 是浮动在两点之间的游标
    // 上面说的并不准确,在读的时候 buf_size 是可以变的,fill buffer 的时候会尽量让它和 IO_BUF_SIZE 保持一样
    int buf_index;
    int buf_size; /* 0 when writing */
    // #define IO_BUF_SIZE 32768,表示 buf 的大小是 32KB 左右,也就是 8 个 page
    uint8_t buf[IO_BUF_SIZE];

    DECLARE_BITMAP(may_free, MAX_IOV_SIZE);
    // 需要同时满足下面两个条件:
    //   1. 总共的数据量不能超过 8 个页
    //   2. 总共的 iovec 数量不能超过 64 个
    // 任何一个不满足,就会 flush。
    struct iovec iov[MAX_IOV_SIZE];
    unsigned int iovcnt;

    int last_error;
    Error *last_error_obj;
    /* has the file has been shutdown */
    bool shutdown;
};

QIOChannelBuffer QEMU

相比于 QIOChannel, 这个结构体多了一个从一个内存区域(变量指针所指向的)进行 IO 的功能。

也就是说,QEMUFile 不是也包含一个 QIOChannel 的结构体吗。一般情况写 QEMUFile 是写到了网络上传输了过去,也就是说这个 QEMUFile 里的 QIOChannel 是一个 socket channel 类型的。但是,如果这个 QEMUFile 里的 QIOChannel 对应的是 buffer(也就是此)类型的。那么就会写到这个对应的 data 内存里面而不是网络上。

/**
 * QIOChannelBuffer:
 *
 * The QIOChannelBuffer object provides a channel implementation
 * that is able to perform I/O to/from a memory buffer.
 *
 */
struct QIOChannelBuffer {
    QIOChannel parent;
    size_t capacity; /* Total allocated memory */
    size_t usage;    /* Current size of data */
    size_t offset;   /* Offset for future I/O ops */
    uint8_t *data;
};

qio_channel_buffer_writev
    //...
    for (i = 0; i < niov; i++) {
        memcpy(bioc->data + bioc->usage, iov[i].iov_base, iov[i].iov_len);
        bioc->usage += iov[i].iov_len;
        bioc->offset += iov[i].iov_len;
        ret += iov[i].iov_len;
    }

struct QEMUFile QEMU

struct QEMUFile {
    const QEMUFileHooks *hooks;
    // 要知道 QIOChannel 是一个抽象类,没有办法被初始化的。
    // 但是对于不同类型的 QIOChannel 读写是有不同的动作的。
    // 这里我感觉
    QIOChannel *ioc;
    bool is_writable;

    /* The sum of bytes transferred on the wire */
    uint64_t total_transferred;

    int buf_index;
    int buf_size; /* 0 when writing */
    uint8_t buf[IO_BUF_SIZE];

    DECLARE_BITMAP(may_free, MAX_IOV_SIZE);
    struct iovec iov[MAX_IOV_SIZE];
    unsigned int iovcnt;

    int last_error;
    Error *last_error_obj;
};

struct QIOChannel QEMU

This base class is abstract so cannot be instantiated. There will be subclasses for dealing with sockets QIOChannelSocket, files QIOChannelFile, and higher level protocols such as TLS QIOChannelTLS, WebSocket QIOChannelWebsock, etc.

The GIOChannel data type aims to provide a portable method for using file descriptors, pipes, and sockets, and integrating them into the main event loop.

To add a GIOChannel to the main event loop, use g_io_add_watch() or g_io_add_watch_full(). Here you specify which events you are interested in on the GIOChannel, and provide a function to be called whenever these events occur.

QIOChannel 也是类似的,这就是它的作用。主要提供的功能是为了能够在有新事件发生时在 mail loop 中进行 callback 调用。这也是 QEMUFile 中包含一个此结构体的原因。

同时这个 channel 作为了一个 backend,对于不同类型(buffer, socket)会有不同的 callback。这也会导致后面对于 QEMUFile 的写会有不同的行为(比如对于 buffer,我们写到了 buffer 里;对于 socket,我们是写到了网络上)。

// 对于不同的 Class,其 parent class 并不是同一个。比如 QIOChannelBufferClass 和 
// QIOChannelSocketClass 的父类 QIOChannelClass 不是同一个对象,所以像下面这样我们更改一个
// 的函数指针并不会影响到另一个。
// 更改了父 Class 的虚函数相当于更改了自己的,也就是说更改了 QIOChannelClass 的相当于也改了
// QIOChannelBufferClass 的。
static void qio_channel_buffer_class_init(ObjectClass *klass, void *class_data G_GNUC_UNUSED)
{
    QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);

    ioc_klass->io_writev = qio_channel_buffer_writev;
    ioc_klass->io_readv = qio_channel_buffer_readv;
    ioc_klass->io_set_blocking = qio_channel_buffer_set_blocking;
    ioc_klass->io_seek = qio_channel_buffer_seek;
    ioc_klass->io_close = qio_channel_buffer_close;
    ioc_klass->io_create_watch = qio_channel_buffer_create_watch;
}

static void qio_channel_socket_class_init(ObjectClass *klass, void *class_data G_GNUC_UNUSED)
{
    QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);

    ioc_klass->io_writev = qio_channel_socket_writev;
    ioc_klass->io_readv = qio_channel_socket_readv;
    ioc_klass->io_set_blocking = qio_channel_socket_set_blocking;
    ioc_klass->io_close = qio_channel_socket_close;
    ioc_klass->io_shutdown = qio_channel_socket_shutdown;
    ioc_klass->io_set_cork = qio_channel_socket_set_cork;
    ioc_klass->io_set_delay = qio_channel_socket_set_delay;
    ioc_klass->io_create_watch = qio_channel_socket_create_watch;
    ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
#ifdef QEMU_MSG_ZEROCOPY
    ioc_klass->io_flush = qio_channel_socket_flush;
#endif
}
//...
/**
 * QIOChannel:
 *
 * The QIOChannel defines the core API for a generic I/O channel
 * class hierarchy. It is inspired by GIOChannel, but has the
 * following differences
 *
 *  - Use QOM to properly support arbitrary subclassing
 *  - Support use of iovecs for efficient I/O with multiple blocks
 *  - None of the character set translation, binary data exclusively
 *  - Direct support for QEMU Error object reporting
 *  - File descriptor passing
 *
 * This base class is abstract so cannot be instantiated. There
 * will be subclasses for dealing with sockets, files, and higher
 * level protocols such as TLS, WebSocket, etc.
 */

struct QIOChannel {
    Object parent;
    unsigned int features; /* bitmask of QIOChannelFeatures */
    char *name;
    AioContext *read_ctx;
    Coroutine *read_coroutine;
    AioContext *write_ctx;
    Coroutine *write_coroutine;
    bool follow_coroutine_ctx;
#ifdef _WIN32
    HANDLE event; /* For use with GSource on Win32 */
#endif
};

qio_channel_add_watch_full() QEMU

可以看到这里面用了很多 glibc 的结构体,说明使用了 glibc 的框架。

/**
 * qio_channel_add_watch_full:
 * @ioc: the channel object
 * @condition: the I/O condition to monitor
 * @func: callback to invoke when the source becomes ready
 * @user_data: opaque data to pass to @func
 * @notify: callback to free @user_data
 * @context: the context to run the watch source
 *
 * Similar as qio_channel_add_watch(), but allows to specify context
 * to run the watch source.
 *
 * Returns: the source ID
 */
guint qio_channel_add_watch_full(QIOChannel *ioc,
                             GIOCondition condition,
                             QIOChannelFunc func,
                             gpointer user_data,
                             GDestroyNotify notify,
                             GMainContext *context)
{
    GSource *source;
    guint id;

    // 
    source = qio_channel_create_watch(ioc, condition);

    g_source_set_callback(source, (GSourceFunc)func, user_data, notify);

    id = g_source_attach(source, context);
    g_source_unref(source);

    return id;
}

qio_channel_add_watch_source() QEMU

QEMUFile, QIOChannel And QIOChannelSocket?

QIOChannelSocket 的变量名经常被命名为 sioc

QIOChannel 的变量名被命名为 ioc

QEMUFile 的变量名被命名为 f

QIOChannel 更像是 QEMUFile 的一个 backend。

  • QEMUFile 负责 hold 我们要写或者要读的 buffer,并不是每次读/写 QEMUFile 的时候我们都会马上写 QIOChannel。在适当时刻我们会通过 qemu_fflush() 来执行这个动作。
  • QIOChannel 更负责我们的数据应该从哪里读/写到哪去。当 qemu_fflush() 的时候我们应该把 QEMUFile 缓存的这些数据写到网络上?还是写到一个内存区间(buffer)里?

Relationship between master thread's buffer/iovec/fd and send thread's buffer/iovec/fd

They connect using the same address: outgoing_args.saddr.

But their fds are NOT the same.

The QIOChannelSocket are also not the same.

Does QEMUFile have a global object?

QEMUFileHooks QEMU

这个结构体目前用的不多,只是在 rdma 会用到。

typedef struct QEMUFileHooks {
    QEMURamHookFunc *before_ram_iterate;
    QEMURamHookFunc *after_ram_iterate;
    QEMURamHookFunc *hook_ram_load;
    // 在 control_ram_save_page 里会调用到
    QEMURamSaveFunc *save_page;
} QEMUFileHooks;

qemu_fill_buffer()

当 buffer 里的数据不够读时,也就是 buf_size == buf_index 时,需要从 underlying QIOChannel 也就是 fd, socket 这些读一些新的数据来填充 buffer。所以,QEMUFile 里的 buf 相当于一个用户态的数据缓冲区。

static ssize_t coroutine_mixed_fn qemu_fill_buffer(QEMUFile *f)
{
    int len;
    int pending;
    Error *local_error = NULL;

    // buffer 里还剩一些没被读的数据
    pending = f->buf_size - f->buf_index;
    //...
    // 因为没读的数据是 buf_index 到 buf_size 之间的
    // 所以可以这样移到前面来,从 buf 开始
    memmove(f->buf, f->buf + f->buf_index, pending);
    // 归为 0,表示要从这里开始读,目前的大小为 pending
    f->buf_index = 0;
    f->buf_size = pending;

    //...
    // blocking fd related...
    // 读到的数据从哪里开始放?f->buf + pending
    // 读多少的数据 IO_BUF_SIZE - pending,也就是尽量保证这个 buffer 是 IO_BUF_SIZE 大小的
    len = qio_channel_read(f->ioc, (char *)f->buf + pending, IO_BUF_SIZE - pending, &local_error);
    // 成功了,更新 buf_size
    if (len > 0) {
        f->buf_size += len;
        f->total_transferred += len;
    // The return value will be 0 when the peer has performed an orderly shutdown.
    } else if (len == 0) {
        qemu_file_set_error_obj(f, -EIO, local_error);
    } else {
        qemu_file_set_error_obj(f, len, local_error);
    }

    return len;
}

qemu_put_buffer() / qemu_put_buffer_async() QEMU

所谓 async 体现在哪里?

两者的参数都包含一个是一个 buf:

void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size);
void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size, bool may_free);

但是 qemu_put_buffer 会首先把 buf 里的内容 memcpy 到 QEMUFile 的 buf 中,然后再调用 add_to_iovec(只有在 full 的时候才会 flush,这意味着并不是及时发送的) 发送 QEMUFile 的 buf,这就说明我们的 buf 可以直接被释放,而不需要等到真的发送。缺点是会多一次 memcpy。

qemu_put_buffer_async 会直接发送传进来的 buf,不会进行复制,因而省去了复制的开销,缺点是 buf 只有在被真正发送之后才会回收。

qemu_fflush() / QEMU

因为 buf 大小是 32KB,会有用尽的时候,可以通过这个函数来 reset buf_index 为 0。

这个函数把 QEMUFile 中的缓存的数据写到 QIOChannel 中去(可能是 buffer,也可能是网络)。

void qemu_fflush(QEMUFile *f)
{
    //...
    if (f->iovcnt > 0) {
        qio_channel_writev_all(f->ioc, f->iov, f->iovcnt, &local_error)
        f->total_transferred += iov_size(f->iov, f->iovcnt);
    }

    f->buf_index = 0;
    f->iovcnt = 0;
}

qemu_peek_byte() / qemu_get_byte() QEMU

qemu_peek_byte():从指定位置查看指定大小的数据,不会更改 buf_index。但是有可能触发 underlying socket 的读。

qemu_get_byte():从指定位置读取指定大小的数据,会更改 buf_index。有可能触发 underlying socket 的读。

qemu_get_buffer()

Read 'size' bytes of data from the file into buf specified. 'size' can be larger than the internal buffer. Will change buf_index.

qio_channel_writev_full_all() QEMU

int qio_channel_writev_full_all(QIOChannel *ioc,
                                const struct iovec *iov,
                                size_t niov,
                                int *fds, size_t nfds,
                                int flags, Error **errp)
{
    int ret = -1;
    struct iovec *local_iov = g_new(struct iovec, niov);
    struct iovec *local_iov_head = local_iov;
    unsigned int nlocal_iov = niov;

    nlocal_iov = iov_copy(local_iov, nlocal_iov, iov, niov, 0, iov_size(iov, niov));
    while (nlocal_iov > 0) {
        ssize_t len;

        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds, flags, errp);

        // if it is blocked, then wait...
        if (len == QIO_CHANNEL_ERR_BLOCK) {
            if (qemu_in_coroutine()) {
                qio_channel_yield(ioc, G_IO_OUT);
            } else {
                qio_channel_wait(ioc, G_IO_OUT);
            }
            continue;
        }
        if (len < 0) {
            goto cleanup;
        }

        // ^discard `len` buffer from iov 
        iov_discard_front(&local_iov, &nlocal_iov, len);
        fds = NULL;
        nfds = 0;
    }

    ret = 0;
 cleanup:
    g_free(local_iov_head);
    return ret;
}

qio_channel_writev_full() QEMU

ssize_t qio_channel_writev_full(QIOChannel *ioc, const struct iovec *iov, size_t niov, int *fds, size_t nfds, int flags, Error **errp)
{
    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
    // a bunch of error checking...
    // io_writev can be...
    //     qio_channel_socket_writev
    //     qio_channel_file_writev
    //     qio_channel_block_writev
    return klass->io_writev(ioc, iov, niov, fds, nfds, flags, errp);
}

qio_channel_socket_writev() QEMU

static ssize_t qio_channel_socket_writev(QIOChannel *ioc, const struct iovec *iov, size_t niov,
                                         int *fds,
                                         size_t nfds,
                                         int flags,
                                         Error **errp)
{
    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
    ssize_t ret;
    struct msghdr msg = { NULL, };
    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
    size_t fdsize = sizeof(int) * nfds;
    struct cmsghdr *cmsg;
    int sflags = 0;
    struct sockaddr_un addr;
    size_t addr_len;

    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));

    msg.msg_iov = (struct iovec *)iov;
    msg.msg_iovlen = niov;

    if (nfds) {
        if (nfds > SOCKET_MAX_FDS) {
            error_setg_errno(errp, EINVAL,
                             "Only %d FDs can be sent, got %zu",
                             SOCKET_MAX_FDS, nfds);
            return -1;
        }

        msg.msg_control = control;
        msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds);

        cmsg = CMSG_FIRSTHDR(&msg);
        cmsg->cmsg_len = CMSG_LEN(fdsize);
        cmsg->cmsg_level = SOL_SOCKET;
        cmsg->cmsg_type = SCM_RIGHTS;
        memcpy(CMSG_DATA(cmsg), fds, fdsize);
    }

    if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
#ifdef QEMU_MSG_ZEROCOPY
        sflags = MSG_ZEROCOPY;
#else
        /*
         * We expect QIOChannel class entry point to have
         * blocked this code path already
         */
        g_assert_not_reached();
#endif
    }

    if (sioc->unix_datagram && sioc->sendtoDgramAddr.path) {
        ret = prepare_unix_sockaddr(&sioc->sendtoDgramAddr,
                                    &addr, &addr_len, errp);
        if (ret < 0)
            return ret;

        msg.msg_name = &addr;
        msg.msg_namelen = addr_len;
    }

 retry:
    ret = sendmsg(sioc->fd, &msg, sflags);
    if (ret <= 0) {
        switch (errno) {
        case EAGAIN:
            return QIO_CHANNEL_ERR_BLOCK;
        case EINTR:
            goto retry;
        case ENOBUFS:
            if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
                error_setg_errno(errp, errno,
                                 "Process can't lock enough memory for using MSG_ZEROCOPY");
                return -1;
            }
            break;
        case ENOENT:
        case ECONNREFUSED:
            if (sioc->unix_datagram) {
                return -1;
            }
        }

        error_setg_errno(errp, errno,
                         "Unable to write to socket");
        return -1;
    }

    if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
        sioc->zero_copy_queued++;
    }

    return ret;
}

add_to_iovec() QEMU

static int add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size, bool may_free)
{
    // 如果要添加的 buffer 的起始地址正好等于最后一个 iovec 的结束地址,那么直接延长这个 iovec
    if (f->iovcnt > 0 && buf == f->iov[f->iovcnt - 1].iov_base +
        f->iov[f->iovcnt - 1].iov_len && may_free == test_bit(f->iovcnt - 1, f->may_free))
    {
        f->iov[f->iovcnt - 1].iov_len += size;
    } else {
        //...
        // 设置一个新的 iovec,根据 may_free 来设置这个 iovec 的属性
        if (may_free) {
            set_bit(f->iovcnt, f->may_free);
        }
        f->iov[f->iovcnt].iov_base = (uint8_t *)buf;
        f->iov[f->iovcnt++].iov_len = size;
    }

    if (f->iovcnt >= MAX_IOV_SIZE) {
        qemu_fflush(f);
        return 1;
    }

    return 0;
}

iov_copy() QEMU

从 iov 当中拷贝到 dst_iov 当中。结构基本保持不变。但是因为 offset^ 的原因,可能会少几个 iovec,因为 offset 前面的 iovec 就不会拷贝到 dst_iov 中了。

offset 和 bytes 表示,当把 iov 所代表的这些 iovec 所表示的数据看成线性的,那么 offset 表示从这个线性的空间的哪里开始拷贝,而 bytes 表示要拷贝多少数据。

unsigned iov_copy(struct iovec *dst_iov, unsigned int dst_iov_cnt,
                 const struct iovec *iov, unsigned int iov_cnt,
                 size_t offset, size_t bytes)
{
    size_t len;
    unsigned int i, j;
    for (i = 0, j = 0; i < iov_cnt && j < dst_iov_cnt && (offset || bytes); i++) {
        // 首先找到 offset 所在的 iov
        if (offset >= iov[i].iov_len) {
            offset -= iov[i].iov_len;
            continue;
        }

        // 找到 offset 后,保持后面的 iovec 形式不变
        len = MIN(bytes, iov[i].iov_len - offset);
        dst_iov[j].iov_base = iov[i].iov_base + offset;
        dst_iov[j].iov_len = len;
        j++;
        bytes -= len;
        offset = 0;
    }
    assert(offset == 0);
    return j;
}