QEMUFile
The files, sockets or fd’s that carry the migration stream are abstracted by the QEMUFile
type. It also has other properties for auxiliary data.
Each QEMUFile
is corresponding to a QIOChannel
: though this, only the first channel is bound to the QEMUFile
's QIOChannel
field (is this the master channel?), other channels WON'T be attached to a QEMUFile
, poor.
// same for qemu_file_new_output/qemu_file_get_return_path
QEMUFile *qemu_file_new_input(QIOChannel *ioc) {
return qemu_file_new_impl(ioc, false); // f->ioc = ioc;
}
struct QEMUFile {
const QEMUFileHooks *hooks;
QIOChannel *ioc; // QIOChannelTLS、QIOChannelFile、QIOChannelSocket... The first channel
bool is_writable;
// Maximum amount of data in bytes to transfer during one rate limiting time window
int64_t rate_limit_max;
// Total amount of data in bytes queued for transfer during this rate limiting time window
int64_t rate_limit_used;
/* The sum of bytes transferred on the wire */
int64_t total_transferred;
// buf_index 和 buf_size 都不是地址,而是 offset。
// buf_size - buf_index 表示此 buffer 剩余的空间。buf + buf_index 表示要从哪里开始读。
// 相当于 buf 和 buf_size 是固定的点,而 buf_index 是浮动在两点之间的游标
// 上面说的并不准确,在读的时候 buf_size 是可以变的,fill buffer 的时候会尽量让它和 IO_BUF_SIZE 保持一样
int buf_index;
int buf_size; /* 0 when writing */
// #define IO_BUF_SIZE 32768,表示 buf 的大小是 32KB 左右,也就是 8 个 page
uint8_t buf[IO_BUF_SIZE];
DECLARE_BITMAP(may_free, MAX_IOV_SIZE);
// 需要同时满足下面两个条件:
// 1. 总共的数据量不能超过 8 个页
// 2. 总共的 iovec 数量不能超过 64 个
// 任何一个不满足,就会 flush。
struct iovec iov[MAX_IOV_SIZE];
unsigned int iovcnt;
int last_error;
Error *last_error_obj;
/* has the file has been shutdown */
bool shutdown;
};
QIOChannelBuffer
QEMU
相比于 QIOChannel
, 这个结构体多了一个从一个内存区域(变量指针所指向的)进行 IO 的功能。
也就是说,QEMUFile
不是也包含一个 QIOChannel
的结构体吗。一般情况写 QEMUFile
是写到了网络上传输了过去,也就是说这个 QEMUFile
里的 QIOChannel
是一个 socket channel 类型的。但是,如果这个 QEMUFile
里的 QIOChannel
对应的是 buffer(也就是此)类型的。那么就会写到这个对应的 data
内存里面而不是网络上。
/**
* QIOChannelBuffer:
*
* The QIOChannelBuffer object provides a channel implementation
* that is able to perform I/O to/from a memory buffer.
*
*/
struct QIOChannelBuffer {
QIOChannel parent;
size_t capacity; /* Total allocated memory */
size_t usage; /* Current size of data */
size_t offset; /* Offset for future I/O ops */
uint8_t *data;
};
qio_channel_buffer_writev
//...
for (i = 0; i < niov; i++) {
memcpy(bioc->data + bioc->usage, iov[i].iov_base, iov[i].iov_len);
bioc->usage += iov[i].iov_len;
bioc->offset += iov[i].iov_len;
ret += iov[i].iov_len;
}
struct QEMUFile
QEMU
struct QEMUFile {
const QEMUFileHooks *hooks;
// 要知道 QIOChannel 是一个抽象类,没有办法被初始化的。
// 但是对于不同类型的 QIOChannel 读写是有不同的动作的。
// 这里我感觉
QIOChannel *ioc;
bool is_writable;
/* The sum of bytes transferred on the wire */
uint64_t total_transferred;
int buf_index;
int buf_size; /* 0 when writing */
uint8_t buf[IO_BUF_SIZE];
DECLARE_BITMAP(may_free, MAX_IOV_SIZE);
struct iovec iov[MAX_IOV_SIZE];
unsigned int iovcnt;
int last_error;
Error *last_error_obj;
};
struct QIOChannel
QEMU
This base class is abstract so cannot be instantiated. There will be subclasses for dealing with sockets QIOChannelSocket
, files QIOChannelFile
, and higher level protocols such as TLS QIOChannelTLS
, WebSocket QIOChannelWebsock
, etc.
The GIOChannel
data type aims to provide a portable method for using file descriptors, pipes, and sockets, and integrating them into the main event loop.
To add a GIOChannel
to the main event loop, use g_io_add_watch()
or g_io_add_watch_full()
. Here you specify which events you are interested in on the GIOChannel
, and provide a function to be called whenever these events occur.
QIOChannel
也是类似的,这就是它的作用。主要提供的功能是为了能够在有新事件发生时在 mail loop 中进行 callback 调用。这也是 QEMUFile
中包含一个此结构体的原因。
同时这个 channel 作为了一个 backend,对于不同类型(buffer, socket)会有不同的 callback。这也会导致后面对于 QEMUFile
的写会有不同的行为(比如对于 buffer,我们写到了 buffer 里;对于 socket,我们是写到了网络上)。
// 对于不同的 Class,其 parent class 并不是同一个。比如 QIOChannelBufferClass 和
// QIOChannelSocketClass 的父类 QIOChannelClass 不是同一个对象,所以像下面这样我们更改一个
// 的函数指针并不会影响到另一个。
// 更改了父 Class 的虚函数相当于更改了自己的,也就是说更改了 QIOChannelClass 的相当于也改了
// QIOChannelBufferClass 的。
static void qio_channel_buffer_class_init(ObjectClass *klass, void *class_data G_GNUC_UNUSED)
{
QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
ioc_klass->io_writev = qio_channel_buffer_writev;
ioc_klass->io_readv = qio_channel_buffer_readv;
ioc_klass->io_set_blocking = qio_channel_buffer_set_blocking;
ioc_klass->io_seek = qio_channel_buffer_seek;
ioc_klass->io_close = qio_channel_buffer_close;
ioc_klass->io_create_watch = qio_channel_buffer_create_watch;
}
static void qio_channel_socket_class_init(ObjectClass *klass, void *class_data G_GNUC_UNUSED)
{
QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
ioc_klass->io_writev = qio_channel_socket_writev;
ioc_klass->io_readv = qio_channel_socket_readv;
ioc_klass->io_set_blocking = qio_channel_socket_set_blocking;
ioc_klass->io_close = qio_channel_socket_close;
ioc_klass->io_shutdown = qio_channel_socket_shutdown;
ioc_klass->io_set_cork = qio_channel_socket_set_cork;
ioc_klass->io_set_delay = qio_channel_socket_set_delay;
ioc_klass->io_create_watch = qio_channel_socket_create_watch;
ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
#ifdef QEMU_MSG_ZEROCOPY
ioc_klass->io_flush = qio_channel_socket_flush;
#endif
}
//...
/**
* QIOChannel:
*
* The QIOChannel defines the core API for a generic I/O channel
* class hierarchy. It is inspired by GIOChannel, but has the
* following differences
*
* - Use QOM to properly support arbitrary subclassing
* - Support use of iovecs for efficient I/O with multiple blocks
* - None of the character set translation, binary data exclusively
* - Direct support for QEMU Error object reporting
* - File descriptor passing
*
* This base class is abstract so cannot be instantiated. There
* will be subclasses for dealing with sockets, files, and higher
* level protocols such as TLS, WebSocket, etc.
*/
struct QIOChannel {
Object parent;
unsigned int features; /* bitmask of QIOChannelFeatures */
char *name;
AioContext *read_ctx;
Coroutine *read_coroutine;
AioContext *write_ctx;
Coroutine *write_coroutine;
bool follow_coroutine_ctx;
#ifdef _WIN32
HANDLE event; /* For use with GSource on Win32 */
#endif
};
qio_channel_add_watch_full()
QEMU
可以看到这里面用了很多 glibc 的结构体,说明使用了 glibc 的框架。
/**
* qio_channel_add_watch_full:
* @ioc: the channel object
* @condition: the I/O condition to monitor
* @func: callback to invoke when the source becomes ready
* @user_data: opaque data to pass to @func
* @notify: callback to free @user_data
* @context: the context to run the watch source
*
* Similar as qio_channel_add_watch(), but allows to specify context
* to run the watch source.
*
* Returns: the source ID
*/
guint qio_channel_add_watch_full(QIOChannel *ioc,
GIOCondition condition,
QIOChannelFunc func,
gpointer user_data,
GDestroyNotify notify,
GMainContext *context)
{
GSource *source;
guint id;
//
source = qio_channel_create_watch(ioc, condition);
g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
id = g_source_attach(source, context);
g_source_unref(source);
return id;
}
qio_channel_add_watch_source()
QEMU
QEMUFile
, QIOChannel
And QIOChannelSocket
?
QIOChannelSocket
的变量名经常被命名为 sioc
。
QIOChannel
的变量名被命名为 ioc
。
QEMUFile
的变量名被命名为 f
。
QIOChannel
更像是 QEMUFile
的一个 backend。
-
QEMUFile
负责 hold 我们要写或者要读的 buffer,并不是每次读/写 QEMUFile 的时候我们都会马上写QIOChannel
。在适当时刻我们会通过qemu_fflush()
来执行这个动作。 -
QIOChannel
更负责我们的数据应该从哪里读/写到哪去。当qemu_fflush()
的时候我们应该把QEMUFile
缓存的这些数据写到网络上?还是写到一个内存区间(buffer)里?
Relationship between master thread's buffer/iovec/fd and send thread's buffer/iovec/fd
They connect using the same address: outgoing_args.saddr
.
But their fds are NOT the same.
The QIOChannelSocket
are also not the same.
Does QEMUFile
have a global object?
QEMUFileHooks
QEMU
这个结构体目前用的不多,只是在 rdma 会用到。
typedef struct QEMUFileHooks {
QEMURamHookFunc *before_ram_iterate;
QEMURamHookFunc *after_ram_iterate;
QEMURamHookFunc *hook_ram_load;
// 在 control_ram_save_page 里会调用到
QEMURamSaveFunc *save_page;
} QEMUFileHooks;
qemu_fill_buffer()
当 buffer 里的数据不够读时,也就是 buf_size == buf_index
时,需要从 underlying QIOChannel 也就是 fd, socket 这些读一些新的数据来填充 buffer。所以,QEMUFile 里的 buf
相当于一个用户态的数据缓冲区。
static ssize_t coroutine_mixed_fn qemu_fill_buffer(QEMUFile *f)
{
int len;
int pending;
Error *local_error = NULL;
// buffer 里还剩一些没被读的数据
pending = f->buf_size - f->buf_index;
//...
// 因为没读的数据是 buf_index 到 buf_size 之间的
// 所以可以这样移到前面来,从 buf 开始
memmove(f->buf, f->buf + f->buf_index, pending);
// 归为 0,表示要从这里开始读,目前的大小为 pending
f->buf_index = 0;
f->buf_size = pending;
//...
// blocking fd related...
// 读到的数据从哪里开始放?f->buf + pending
// 读多少的数据 IO_BUF_SIZE - pending,也就是尽量保证这个 buffer 是 IO_BUF_SIZE 大小的
len = qio_channel_read(f->ioc, (char *)f->buf + pending, IO_BUF_SIZE - pending, &local_error);
// 成功了,更新 buf_size
if (len > 0) {
f->buf_size += len;
f->total_transferred += len;
// The return value will be 0 when the peer has performed an orderly shutdown.
} else if (len == 0) {
qemu_file_set_error_obj(f, -EIO, local_error);
} else {
qemu_file_set_error_obj(f, len, local_error);
}
return len;
}
qemu_put_buffer()
/ qemu_put_buffer_async()
QEMU
所谓 async
体现在哪里?
两者的参数都包含一个是一个 buf:
void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size);
void qemu_put_buffer_async(QEMUFile *f, const uint8_t *buf, size_t size, bool may_free);
但是 qemu_put_buffer
会首先把 buf 里的内容 memcpy 到 QEMUFile
的 buf 中,然后再调用 add_to_iovec
(只有在 full 的时候才会 flush,这意味着并不是及时发送的) 发送 QEMUFile
的 buf,这就说明我们的 buf 可以直接被释放,而不需要等到真的发送。缺点是会多一次 memcpy。
qemu_put_buffer_async
会直接发送传进来的 buf,不会进行复制,因而省去了复制的开销,缺点是 buf 只有在被真正发送之后才会回收。
qemu_fflush()
/ QEMU
因为 buf 大小是 32KB,会有用尽的时候,可以通过这个函数来 reset buf_index
为 0。
这个函数把 QEMUFile
中的缓存的数据写到 QIOChannel
中去(可能是 buffer,也可能是网络)。
void qemu_fflush(QEMUFile *f)
{
//...
if (f->iovcnt > 0) {
qio_channel_writev_all(f->ioc, f->iov, f->iovcnt, &local_error)
f->total_transferred += iov_size(f->iov, f->iovcnt);
}
f->buf_index = 0;
f->iovcnt = 0;
}
qemu_peek_byte()
/ qemu_get_byte()
QEMU
qemu_peek_byte()
:从指定位置查看指定大小的数据,不会更改 buf_index
。但是有可能触发 underlying socket 的读。
qemu_get_byte()
:从指定位置读取指定大小的数据,会更改 buf_index
。有可能触发 underlying socket 的读。
qemu_get_buffer()
Read 'size' bytes of data from the file into buf specified. 'size' can be larger than the internal buffer. Will change buf_index
.
qio_channel_writev_full_all()
QEMU
int qio_channel_writev_full_all(QIOChannel *ioc,
const struct iovec *iov,
size_t niov,
int *fds, size_t nfds,
int flags, Error **errp)
{
int ret = -1;
struct iovec *local_iov = g_new(struct iovec, niov);
struct iovec *local_iov_head = local_iov;
unsigned int nlocal_iov = niov;
nlocal_iov = iov_copy(local_iov, nlocal_iov, iov, niov, 0, iov_size(iov, niov));
while (nlocal_iov > 0) {
ssize_t len;
len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds, flags, errp);
// if it is blocked, then wait...
if (len == QIO_CHANNEL_ERR_BLOCK) {
if (qemu_in_coroutine()) {
qio_channel_yield(ioc, G_IO_OUT);
} else {
qio_channel_wait(ioc, G_IO_OUT);
}
continue;
}
if (len < 0) {
goto cleanup;
}
// ^discard `len` buffer from iov
iov_discard_front(&local_iov, &nlocal_iov, len);
fds = NULL;
nfds = 0;
}
ret = 0;
cleanup:
g_free(local_iov_head);
return ret;
}
qio_channel_writev_full()
QEMU
ssize_t qio_channel_writev_full(QIOChannel *ioc, const struct iovec *iov, size_t niov, int *fds, size_t nfds, int flags, Error **errp)
{
QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
// a bunch of error checking...
// io_writev can be...
// qio_channel_socket_writev
// qio_channel_file_writev
// qio_channel_block_writev
return klass->io_writev(ioc, iov, niov, fds, nfds, flags, errp);
}
qio_channel_socket_writev()
QEMU
static ssize_t qio_channel_socket_writev(QIOChannel *ioc, const struct iovec *iov, size_t niov,
int *fds,
size_t nfds,
int flags,
Error **errp)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
ssize_t ret;
struct msghdr msg = { NULL, };
char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
size_t fdsize = sizeof(int) * nfds;
struct cmsghdr *cmsg;
int sflags = 0;
struct sockaddr_un addr;
size_t addr_len;
memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
msg.msg_iov = (struct iovec *)iov;
msg.msg_iovlen = niov;
if (nfds) {
if (nfds > SOCKET_MAX_FDS) {
error_setg_errno(errp, EINVAL,
"Only %d FDs can be sent, got %zu",
SOCKET_MAX_FDS, nfds);
return -1;
}
msg.msg_control = control;
msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds);
cmsg = CMSG_FIRSTHDR(&msg);
cmsg->cmsg_len = CMSG_LEN(fdsize);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
memcpy(CMSG_DATA(cmsg), fds, fdsize);
}
if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
#ifdef QEMU_MSG_ZEROCOPY
sflags = MSG_ZEROCOPY;
#else
/*
* We expect QIOChannel class entry point to have
* blocked this code path already
*/
g_assert_not_reached();
#endif
}
if (sioc->unix_datagram && sioc->sendtoDgramAddr.path) {
ret = prepare_unix_sockaddr(&sioc->sendtoDgramAddr,
&addr, &addr_len, errp);
if (ret < 0)
return ret;
msg.msg_name = &addr;
msg.msg_namelen = addr_len;
}
retry:
ret = sendmsg(sioc->fd, &msg, sflags);
if (ret <= 0) {
switch (errno) {
case EAGAIN:
return QIO_CHANNEL_ERR_BLOCK;
case EINTR:
goto retry;
case ENOBUFS:
if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
error_setg_errno(errp, errno,
"Process can't lock enough memory for using MSG_ZEROCOPY");
return -1;
}
break;
case ENOENT:
case ECONNREFUSED:
if (sioc->unix_datagram) {
return -1;
}
}
error_setg_errno(errp, errno,
"Unable to write to socket");
return -1;
}
if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
sioc->zero_copy_queued++;
}
return ret;
}
add_to_iovec()
QEMU
static int add_to_iovec(QEMUFile *f, const uint8_t *buf, size_t size, bool may_free)
{
// 如果要添加的 buffer 的起始地址正好等于最后一个 iovec 的结束地址,那么直接延长这个 iovec
if (f->iovcnt > 0 && buf == f->iov[f->iovcnt - 1].iov_base +
f->iov[f->iovcnt - 1].iov_len && may_free == test_bit(f->iovcnt - 1, f->may_free))
{
f->iov[f->iovcnt - 1].iov_len += size;
} else {
//...
// 设置一个新的 iovec,根据 may_free 来设置这个 iovec 的属性
if (may_free) {
set_bit(f->iovcnt, f->may_free);
}
f->iov[f->iovcnt].iov_base = (uint8_t *)buf;
f->iov[f->iovcnt++].iov_len = size;
}
if (f->iovcnt >= MAX_IOV_SIZE) {
qemu_fflush(f);
return 1;
}
return 0;
}
iov_copy()
QEMU
从 iov 当中拷贝到 dst_iov 当中。结构基本保持不变。但是因为 offset^ 的原因,可能会少几个 iovec,因为 offset 前面的 iovec 就不会拷贝到 dst_iov 中了。
offset 和 bytes 表示,当把 iov 所代表的这些 iovec 所表示的数据看成线性的,那么 offset 表示从这个线性的空间的哪里开始拷贝,而 bytes 表示要拷贝多少数据。
unsigned iov_copy(struct iovec *dst_iov, unsigned int dst_iov_cnt,
const struct iovec *iov, unsigned int iov_cnt,
size_t offset, size_t bytes)
{
size_t len;
unsigned int i, j;
for (i = 0, j = 0; i < iov_cnt && j < dst_iov_cnt && (offset || bytes); i++) {
// 首先找到 offset 所在的 iov
if (offset >= iov[i].iov_len) {
offset -= iov[i].iov_len;
continue;
}
// 找到 offset 后,保持后面的 iovec 形式不变
len = MIN(bytes, iov[i].iov_len - offset);
dst_iov[j].iov_base = iov[i].iov_base + offset;
dst_iov[j].iov_len = len;
j++;
bytes -= len;
offset = 0;
}
assert(offset == 0);
return j;
}