Original patch set:

Multi-fd 发送的时候的数据流是这样的:有一个 iovec 数组,在 index 为 0 处填充 meta 信息,也就是整个 MultiFDPacket_t 结构体。其他的 vec 填充通过 nocomp_send_prepare_private/nocomp_send_prepare_shared 进行。

  • 对于 shared page,也就是 nocomp_send_prepare_shared(),他是一个 page 用一个 iovec。
  • 对于 private page,也就是 nocomp_send_prepare_private(),他也是一个 page 用一个 iovec,但是在这之前的几个 iovec 它会存一些 TDX specific 的元信息,比如 mbmd,gpa_list,mac_list 等等,各用一个 iovec。

ram_save_target_page 这个函数里才真正开始区分仅用 "live_migration" thread 的普通 pre-copy 方式和 multi-fd 方式:

// The master thread
qemu_thread_create(&s->thread, "live_migration", migration_thread, s, QEMU_THREAD_JOINABLE);
    migration_thread
        migration_iteration_run
            qemu_savevm_state_iterate
                ram_save_iterate // se->ops->save_live_iterate(f, se->opaque);
                    ram_find_and_save_block
                        ram_save_host_page
                            migration_ops-> ram_save_target_page()
                                ram_save_target_page_legacy
                                    if (migrate_multifd() && !migration_in_postcopy()) {
                                        ram_save_multifd_page(pss->pss_channel, block, offset);
                                    else
                                        ram_save_page(rs, pss);

Multi-fd 和普通的还有一个最大的区别,就是 multi-fd 是以一个 RAMBlock 为一个 task 来发送的,page 会攒着直到切换了 RAMBlock 或者满了;但是普通的是一个 page 一个 page 发的,当然,buffer 满了(8 个 page)或者 iovec 数量超了 64 才会发送。但是对于 multi-fd,一次可以发送 128 个 page。

Is multi-thread still used in completion phase?

还是会使用,因为调用到了 ram_find_and_save_block() 函数。

Is the fd used for mutli-fd packet sending a block fd or not?

调用的函数是 qio_channel_writev_full_all()

有一个函数叫 qio_channel_set_blocking()。If @enabled is true, then the channel is put into blocking mode, otherwise it will be non-blocking. In non-blocking mode, read/write operations may return QIO_CHANNEL_ERR_BLOCK if they would otherwise block on I/O。

可以看到即使是 non-blocking 的,这个函数的调用者还是会被强行 wait 或者 yield,所以其实和 blocking 的也没有区别,不重要。

注意 blocking fd 的意思是 block 到发送缓冲区有 buffer,不是保证在 destination 那里 receive 了才解除 block。

qio_channel_writev_full_all
    while (nlocal_iov > 0) {
        qio_channel_writev_full
            if (len == QIO_CHANNEL_ERR_BLOCK) {
                if (qemu_in_coroutine()) {
                    qio_channel_yield(ioc, G_IO_OUT);
                } else {
                    qio_channel_wait(ioc, G_IO_OUT);
                }
                continue;
            }

也可以看 qemu_file_set_blocking()

// Set the fd for master thread to blocking
socket_outgoing_migration
    migration_channel_connect
        migrate_fd_connect
            qemu_file_set_blocking(s->to_dst_file, true);

How does multi-fd live migration send the zero pages?

Zero pages is still sent by the master thread in the traditional way:

static int ram_save_target_page_legacy(RAMState *rs, PageSearchStatus *pss)
{
    RAMBlock *block = pss->block;
    ram_addr_t offset = ((ram_addr_t)pss->page) << TARGET_PAGE_BITS;
    int res;

    //...
    // If the page is zero, then save it directly rather than
    // saving it to multifd.
    if (save_zero_page(rs, pss, offset)) {
        return 1;
    }

    /*
     * Do not use multifd in postcopy as one whole host page should be
     * placed.  Meanwhile postcopy requires atomic update of pages, so even
     * if host page size == guest page size the dest guest during run may
     * still see partially copied pages which is data corruption.
     */
    if (migrate_multifd() && !migration_in_postcopy()) {
        return ram_save_multifd_page(pss->pss_channel, block, offset);
    }

    return ram_save_page(rs, pss);
}

multifd_send_fill_packet() QEMU

这个函数主要是为了将 p 里的一些变量赋值到 p->packet (MultiFDPakcet_t),这个结构体本身也会作为那些 pages 的元信息发过去,比如每一个 page 在此 RAMBlock 里的 offset,所以叫 fill packet。

static void multifd_send_fill_packet(MultiFDSendParams *p)
{
    MultiFDPacket_t *packet = p->packet;
    MultiFDPages_t *pages = p->pages;
    int i;

    packet->flags = cpu_to_be32(p->flags);
    packet->pages_alloc = cpu_to_be32(pages->allocated);
    packet->normal_pages = cpu_to_be32(p->normal_num);
    packet->next_packet_size = cpu_to_be32(p->next_packet_size);
    packet->packet_num = cpu_to_be64(p->packet_num);

    if (pages->block) {
        strncpy(packet->ramblock, pages->block->idstr, 256);
    }

    for (i = 0; i < pages->num; i++) {
        packet->offset[i] = cpu_to_be64(pages->offset[i]);
    }
}

multifd_send_pages() QEMU

This function will only be called by multifd_queue_page() and multifd_send_sync_main().

这个函数并不是真发送,只不过是做一个发送前的 setup。

这个函数只做一件事,就是看一下新到的这个任务应该分给哪一个倒霉的 channel。

// pages 作为指针并不是指向一个 MultiFDPages_t 的数组,pages 本身就表示很多的 page,所以
// 它只指向这个结构体的一个 instance。
// 我们要做的,是把 pages 赋给我们选择的一个空闲的 channel
static int multifd_send_pages(QEMUFile *f, MultiFDPages_t *pages)
{
    int i;
    static int next_channel;
    MultiFDSendParams *p = NULL; /* make happy gcc */
    uint64_t transferred;

    // Number is 0, so nothing needs to be sent
    if (!pages->num)
        return 0;

    // We are exiting, so return
    if (qatomic_read(&multifd_send_state->exiting))
        return -1;

    // 这是一个信号量,表示当前有几个 channel 是 ready 的
    // 消费者,channels_ready 减一,表示我们要占用一个 channel
    // 所以至少要有一个 channel 是 ready 的。
    qemu_sem_wait(&multifd_send_state->channels_ready);
    /*
     * next_channel can remain from a previous migration that was
     * using more channels, so ensure it doesn't overflow if the
     * limit is lower now.
     */
    next_channel %= migrate_multifd_channels();
    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
        p = &multifd_send_state->params[i];
        //...
        // 这个 channel 正合我意!当前没有工作要做,给它安排一个。
        // next_channel 的作用,我认为是让各个 channel 的地位变得
        // 平等,不受 channel 在 paras 数组中 index 的影响,不能每次都重新
        // 开始搜索,然后每次都让前几个 channel 干活。
        if (!p->pending_job) {
            p->pending_job++;
            next_channel = (i + 1) % migrate_multifd_channels();
            break;
        }
        //...
    }
    // 我们选中的这个空闲的 channel,num 和 block 应当是 0
    assert(!p->pages->num);
    assert(!p->pages->block);

    if (multifd_pages_is_private(pages)) {
        // 别忘了每一个 packet 都有 id,我们用这种方式赋予这个 packet id。
        p->packet_num = multifd_send_state->private_packet_num++;
        // 因为这个函数的任务是派发 pages 到 migration channels,当我们选择了
        // p 所代表的 channel 时,说明它时空的,那么 p->pages 也就是空的,所以这
        // 个操作相当于把 global 的 pages 置空了,表示任务已经发出去了。
        multifd_send_state->private_pages = p->pages;
    } else {
        // 这里两行同上
        p->packet_num = multifd_send_state->shared_packet_num++;
        multifd_send_state->shared_pages = p->pages;
    }

    // 给此 channel 分配任务,任务目标是发送这些 pages
    p->pages = pages;

    // packet_len 表示的是 metadata 的长度,加上真正数据的大小,也就是页面的大小,就是真正需要传输的。
    transferred = ((uint64_t) pages->num) * qemu_target_page_size() + p->packet_len;
    // some rate limiting...
    ram_counters.multifd_bytes += transferred;
    ram_counters.transferred += transferred;
    //...
}

// legacy version (not TD)
static int multifd_send_pages(QEMUFile *f)
{
    int i;
    static int next_channel;
    MultiFDSendParams *p = NULL; /* make happy gcc */
    MultiFDPages_t *pages = multifd_send_state->pages;

    //...
    // Ensure that at least there is 1 thread is ready, otherwise
    // we cannot dispatch the task.
    qemu_sem_wait(&multifd_send_state->channels_ready);

    next_channel %= migrate_multifd_channels();
    // Find the next channel that is waiting for tasks
    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
        p = &multifd_send_state->params[i];

        // Check that this thread is not quited
        if (p->quit) {
            error_report("%s: channel %d has already quit!", __func__, i);
            qemu_mutex_unlock(&p->mutex);
            return -1;
        }
        // if this channel is idle, let it do the work
        if (!p->pending_job) {
            p->pending_job++;
            next_channel = (i + 1) % migrate_multifd_channels();
            break;
        }
    }
    // Give the pages struct to the channel we have chosen
    multifd_send_state->pages = p->pages;
    p->pages = pages;

    // Wake up that channel so it can do the task
    qemu_sem_post(&p->sem);
    return 1;
}

ram_save_multifd_page() / multifd_queue_page() QEMU

这个函数会在某些情况(要换 block,或者当前 buffer 已满)调用 multifd_send_pages,这个函数会派发 task 给下面的 channel 让它们发送。

static int ram_save_multifd_page(QEMUFile *file, RAMBlock *block, ram_addr_t offset)
{
    multifd_queue_page(file, block, offset) < 0);
    //...
    stat64_add(&mig_stats.normal_pages, 1);
    //...
}

// TDX LM 版本
// block: 表示这个要 queue 的 page 所属的 block 
// private_gpa:当这个为 CGS_PRIVATE_GPA_INVALID 时,表示我们迁的是一个 shared page
int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset, hwaddr private_gpa)
{
    MultiFDPages_t *pages;

    // multifd_send_state 是一个全局的变量,里面的 pages
    // 表示的是我们现在正在构建的 pages,但是还没有构建完成
    // 当构建完成后,其会被派发给一个空闲的 channel 来发送
    // 这些 pages。比如当我们要换 block 来 queue page 时
    // 就会清空已经 queue 的 pages。
    if (private_gpa == CGS_PRIVATE_GPA_INVALID) {
        pages = multifd_send_state->shared_pages;
    } else {
        // 这种情况下,传进来的 offset 和 private_gpa 其实值是一样的
        pages = multifd_send_state->private_pages;
    }

    // 如果当前 pages 没有 block,就用想要 queue 的 page 所在的 block
    // 这应该表示这个 page 是 pages 里的第一个 page,恭喜
    if (!pages->block) {
        pages->block = block;
    }

    // 表示要 queue 的 page 所在的 block,和 pages 里的 block 是相等的
    // 这应该是大多数情况
    // 这个 check 在此函数里有两次,Zhenzhong 有 fix:ddbe628c97c3a2d211c6d96383cb4063ac3ad0f9
    if (pages->block == block) {
        pages->offset[pages->num] = offset;
    	pages->private_gpa[pages->num] = private_gpa;
        pages->num++;

        // 没有占满 buffer,可以返回了,已经结束了,等到占满了我们再真的发任务给 channel
        if (pages->num < pages->allocated) {
            // return 1 是 successful 的
            return 1;
        }
    }

    // 要么 buffer 满了,要么换了 block
    // buffer 满了,发任务,合情合理
    // block 不一样,说明我们要换 block 啦,那之前的 block 攒的
    // 那些 page 不得发出去?所以发任务也合情合理
    multifd_send_pages(f, pages)

    // 之前的 block 的 page 已经都发出去啦,
    if (pages->block != block) {
        // multifd_send_pages() 会把 global block 置空,所以我们
        // 再次调用此函数时,会发现 pages->block 和我们传进去的 block
        // 相等了,因为如上所述,空的会被赋值为 block,这表示我们切换了当前任务
        // 要换一个 block 来发送了。
        return  multifd_queue_page(f, block, offset, private_gpa);
    }

    return 1;
}

// Legacy 版本
int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset)
{
    MultiFDPages_t *pages = multifd_send_state->pages;
    bool changed = false;

    // The first page in the pages array, the block is not determined so we need to initialize
    // the value of pages->block to the block this page belongs to
    if (!pages->block) {
        pages->block = block;
    }

    // If the RAMBlock of the page we are saving equals to the block we are currently processing
    // then stack this page into the pages struct
    if (pages->block == block) {
        pages->offset[pages->num] = offset;
        pages->num++;

        // If the pages is not full, i.e., we don't have 128 pages stacked to save for now.
        // then we can just return and wait for more pages come in.
        if (pages->num < pages->allocated) {
            return 1;
        }
    } else {
        changed = true;
    }

    // If the block is changed, or the pages is full, we need to save the pages stacked in the pages struct
    multifd_send_pages(f);

    // Don't forget that after we saving all the pages stacked prevously,
    // we still have a new page need to be stacked.
    if (changed)
        return multifd_queue_page(f, block, offset);

    return 1;
}

概括一下,整个流程是这样的:global 手里拿着一个 pages,一直堆积堆积堆积:

  • 当满的时候,它会找一个 channel 把这个 pages 派发出去,然后把自己置空。
  • 当堆积到一半发现下一个要堆积的 page block 变了之后,这就意味着没法在一个 packet 里发出去了,那么干脆先把当前已经堆积的 pages 发出去,然后重新换了一个 block 进行堆积。

可以看到,如果我们频繁地换 block,那么显而易见 packet 数量会变多,可能会一定程度影响性能。

multifd_send_sync_main() / MULTIFD_FLAG_SYNC QEMU

这个函数只是告诉了 destination 的 channel thread 来 sync,在执行完此函数之后,还需要发送 RAM_SAVE_FLAG_MULTIFD_FLUSH 到 destination 来让其 master thread 来调用 multifd_recv_sync_main(),drive 和 channel threads 之间的 sync 过程。

Motivation: 一个可能的原因是在跨 complete_round 的时候同步一下,因为之后可能发送的 page 的 GPA 就和之前相同了(dirty page),这种应该先让 destination 那里先充分接收了上一个 round 的 page,避免一个 dirty 的 page 先于之前 export 的 page 到达从而造成了乱序。(在没有跨 round 的时候,因为每一个 page 只被发送了一次,所以我们可以让多个 thread 并行地进行 export 和 import,它们涉及到的 pages 并不重合,所以这样是安全的):简而言之,就是

  • 不想让 sync 之后发送的 packet 在 sync 之前被接收;
  • 不想让 sync 之前发送的 packet 在 sync 之后被接收。
find_dirty_block
    if (!offset_in_ramblock) // 这个 rb 没有,切下一个
        if (!pss->block) { // 到最后一个 block 了,说明我们走了一个 round 了
            multifd_send_sync_main // 那么此时应该 sync 一下

这个函数的主要作用就是把:

  • 每一个 thread 还没有完成的任务(在对应的 params 里存着)
  • 当前要发的任务(在 global 里存着, multifd_send_state
  • 每一个 thread 单独派发的只是为了让 dst 端 sync 的任务 p->flags |= MULTIFD_FLAG_SYNC,这个 packet 不包含真正的数据;

都清空掉,确保 main thread 和 channels 都同步在同一个位置。对于全局的任务,是通过派发的形式派发给下面的 thread,对于 thread pending 的任务,等它们发完,这样它们才会 post sem 来通知 master thread,所以当 master thread 收到了对应的 sem,那就说明 thread pending 的任务也已经处理完了,每一个 thread 都在等待的状态。

为什么要同步呢?

这个函数在 ram_save_complete, ram_save_iterate, ram_save_setup 以及 find_dirty_block 里都有调,调的地方还是挺杂的。我觉得原因还是 main thread 要进行下一步的动作必须保证所有的都发完了,也就是有 dependency,所以用这种方式来同步。

int multifd_send_sync_main(QEMUFile *f)
{
    int i;
    bool flush_zero_copy;
    MultiFDPages_t *private_pages;
    MultiFDPages_t *shared_pages;

    //...
    private_pages = multifd_send_state->private_pages; // TD private pages
    shared_pages = multifd_send_state->shared_pages; // shared pages by TD or all pages of the VM

    // ... 清空 VM scope 的任务 ...
    // 1. 先分配 shared pages 的任务
    // 目的是把现在还缓存着的 pages 发送掉(对于缓存机制可以参考 multifd_queue_page())
    // 也就是把任务清空
    if (shared_pages->num)
        multifd_send_pages(f, shared_pages)

    // 2. 再分配 private pages 的任务
    // 目的是把现在还缓存着的 pages 发送掉(对于缓存机制可以参考 multifd_queue_page())
    // 也就是把任务清空
    if (private_pages->num)
        multifd_send_pages(f, private_pages)

    // zero-copy specific things...
    // 给每一个 channel 分配一个虚假的任务
    for (i = 0; i < migrate_multifd_channels(); i++) {
        MultiFDSendParams *p = &multifd_send_state->params[i];
        //...
        // 这些操作都是在 lock 里的,也就是 worker thread 不会对此进行更新
        p->packet_num = multifd_send_state->shared_packet_num++;
        // 这个 flag 就是为了同步用的,当 channel 收到这个 flag 时,会往这个
        // sem_sync 里 post,这样就可以通知到 main thread。
        p->flags |= MULTIFD_FLAG_SYNC;
        // 因为我们在 mutex 里,这说明 channel 必不可能在 
        p->pending_job++;
        // rate limit and stats...
        //...
        // zero-copy things...
    }

    // 停下来,和所有的 channels 同步。因为 master thread 停下来了,不会再发任务了, threads
    // 们也就不会有新的任务了,所以当都到达 barrier 这里时,所有的 thread 任务都处理完了,都不会有任务。
    for (i = 0; i < migrate_multifd_channels(); i++) {
        MultiFDSendParams *p = &multifd_send_state->params[i];
        //...
        // 这个信号量就是给 main thread 和 channels 之前同步用的。
        // 这是一个在 main thread 的 barrier,需要每一个 channel 都到达
        // barrier 这里才会通过。
        qemu_sem_wait(&p->sem_sync);
    }
    //...

这个 flag 就是同步用的。

在每次调用 multifd_send_sync_main() 的时候会被置上。

MultiFDMethods

typedef struct {
    /* Setup for sending side */
    int (*send_setup)(MultiFDSendParams *p, Error **errp);
    /* Cleanup for sending side */
    void (*send_cleanup)(MultiFDSendParams *p, Error **errp);
    /* Prepare the send packet */
    int (*send_prepare)(MultiFDSendParams *p, Error **errp);
    /* Setup for receiving side */
    int (*recv_setup)(MultiFDRecvParams *p, Error **errp);
    /* Cleanup for receiving side */
    void (*recv_cleanup)(MultiFDRecvParams *p);
    /* Read all pages */
    int (*recv_pages)(MultiFDRecvParams *p, Error **errp);
} MultiFDMethods;

static MultiFDMethods multifd_nocomp_ops = { // no compression
    //...
};

static MultiFDMethods multifd_zlib_ops = { // compress using zlib
    //...
};

static MultiFDMethods multifd_zstd_ops = { // compress using zstd
    //...
};

// When will send_* methods in this struct be called?
@socket_outgoing_migration
    migration_channel_connect
        migrate_fd_connect
            multifd_save_setup
                for (i = 0; i < thread_count; i++)
                    ret = multifd_send_state->ops->send_setup(p, &local_err); // send_setup
                    multifd_send_thread
                        ret = multifd_send_state->ops->send_prepare(p, &local_err); // send_prepare
            migration_thread
                migration_iteration_finish
                    migrate_fd_cleanup_schedule
                        qemu_bh_schedule(s->cleanup_bh); // send_cleanup

// When will recv_* methods in this struct be called?
@migration_ioc_process_incoming
    migration_incoming_setup
        multifd_load_setup
            for (i = 0; i < thread_count; i++) {
                ret = multifd_recv_state->ops->recv_setup(p, &local_err); // recv_setup
            }
    multifd_recv_new_channel
        multifd_recv_thread
            ret = multifd_recv_state->ops->recv_pages(p, &local_err); // recv_pages
    migration_incoming_process
        process_incoming_migration_co
            multifd_load_cleanup
                multifd_recv_state->ops->recv_cleanup(p); // recv_cleanup

MultiFDPacket_t QEMU

大约表示包的 header,包除了头部,剩下的是 page_count 个 page 的内容。

一个 packet 包含 page 的数量由 page_count 变量决定(经计算是 128 个):

一个 MultiFDPacket_t 和一个 MultiFDPages_t 对应,在函数 multifd_send_fill_packet 中,前者会基于后者构建。

int multifd_save_setup(Error **errp) {
    //...
    // 一般来说:MULTIFD_PACKET_SIZE 是一个宏定义,被定义为了 512K(2^19)
    // qemu_target_page_size() 返回 4K(2^12)
    // 所以一个 packet 应该会包含 128 个 page。
    uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
    //...
}
typedef struct {
    // To let destination check that this is a multifd packet
    uint32_t magic;
    // To ensure compatibility
    uint32_t version;
    // Could be used to:
    //  - differenciate different compression methods.
    //  - let destination sync between its master thread and channel thread
    uint32_t flags;
    /* maximum number of allocated pages */
    // 一般都初始化为 pages->allocated
    // 而这个也一般会被初始化为 128
    // 所以这个也基本上固定是 128
    // A multifd packet can contain multiple pages' content
    // Currently this number is fixed as 128, which means 1 packet
    // can contain 128 pages.
    uint32_t pages_alloc;
    // 非零页的数量
    // The number of the pages that are not zero
    uint32_t normal_pages;
    /* size of the next packet that contains pages */
    // nocomp_send_prepare()
    //     p->next_packet_size = p->normal_num * p->page_size;
    // Since page_size is a constant, from above code we know this variable is
    // mainly determined by the number of the pages that are not zero
    uint32_t next_packet_size;
    // 在源端生成这个包时,已经有多少个包生成了。
    // 我觉得这个也可以唯一地 identify 一个包,可以当作一个
    // 包的 id,因此每一个 channel 发送的包的 id 未必是连续的。
    // 总的来说,这个东西没什么用,code 里没有看到有逻辑判断是基于此的。
    // This can identify the current packet
    uint64_t packet_num;
    uint64_t unused[4];    /* Reserved for future use */
    // 表示这些 pages 所在 ramblock 的 id
    // These pages belong to the same RAMBlock, this
    // represents the idstr of that block
    char ramblock[256];
    // 每一个 page 在 RB 里的 offset
    // offset of each page in the RB
    uint64_t offset[];
} __attribute__((packed)) MultiFDPacket_t;

MultiFDPages_t QEMU

这个结构体主要的作用是方便 master thread 和 channel thread 互相交换 pages 相关的数据。只需要交换下指针就能把数据给到一个 thread。如果没有这个结构体,那么就需要每次把每一个 page 的 offset copy 给 channel thread,降低运行速度。

This struct is only used in the source side, not in the destination side. It is used during queuing the pages.

The global variable multifd_send_state and the per-thread MultiFDSendParams both have a this member.

结构体内的数据流向:

// In master thread multifd_queue_page(), queue each page's offset to multifd_send_state->pages
if (!pages->block)
    pages->block = block;
if (pages->block == block)
    pages->offset[pages->num] = offset;
    pages->num++;
    if (pages->num < pages->allocated)
        return 1;

// In master thread mutlifd_send_pages(), before adding the pending_job
// we will first **switch** the MultiFDSendParams->pages <-> multifd_send_state->pages
MultiFDPages_t *pages = multifd_send_state->pages;
multifd_send_state->pages = p->pages; // in multifd_send_pages()
p->pages = pages;

// In send thread
// we need to use MultiFDSendParams->pages offset to fill the normal
multifd_send_thread
    if (p->pending_job) {
            for (int i = 0; i < p->pages->num; i++) {
                p->normal[p->normal_num] = p->pages->offset[i];
                p->normal_num++;
            }
            p->pages->num = 0;
            p->pages->block = NULL;

这个结构体本身就代表很多个 page,这些个 page 必须同属于一个 RAMBlock。

一个 MultiFDPages_t 和一个 MultiFDPacket_t 对应,前者用来在发送时构建后者。

typedef struct {
    /* number of used pages */
    // 也表示了 offset 和 private_gpa 这两个数组的有效长度(注意不是分配长度)
    // 也就是有数据的长度
    uint32_t num;
    // 这些页所在的 packet 的 id
    // 页本身用 gfn 就已经可以作为 id 了,所以这不是页本身的 id。
    uint64_t packet_num;
    // 表示 buffer 可以容纳的大小,一般都初始化为了 128(一个 packet 可以容纳的 page 的大小)
    // 这进一步证实了一个此结构体和一个 MultiFDPacket_t 对应。
    uint32_t allocated;
    // 每一个页在 block 中的 offset
    // 等于这个页的 HVA 减去 block 的起始 HVA
    // Each pages' offset in this RAMBlock
    ram_addr_t *offset;
    // 上面每一个 offset 页所对应的 GPA,放在这个数组里面,和上面的 offset 相对应。
    // 事实上在 code 里,offset 和 private_gpa 都是在一起处理的
    // multifd_queue_page() 这个函数的参数就是一个 offset 一个 gpa
    hwaddr *private_gpa;
    // 这些 page 属于的 block
    RAMBlock *block;
} MultiFDPages_t;

MultiFDRecvParams QEMU

很重要的一个结构体。这个结构体是和一个 QIOChannel 对应的。

typedef struct {
    /* Fields are only written at creating/deletion time */
    /* No lock required for them, they are read only */

    /* channel number */
    uint8_t id;
    /* channel thread name */
    char *name;
    /* channel thread id. When to run? */
    QemuThread thread;
    /* communication channel */
    QIOChannel *c;
    /* packet allocated len */
    uint32_t packet_len;

    /* syncs main thread and channels */
    QemuSemaphore sem_sync;

    /* this mutex protects the following parameters */
    QemuMutex mutex;
    /* is this channel thread running */
    bool running;
    /* should this thread finish */
    bool quit;
    /* multifd flags for **current packet** such as MULTIFD_FLAG_SYNC to sync on this packet */
    uint32_t flags;
    /* global number of generated multifd packets */
    uint64_t packet_num;

    /* thread local variables. No locking required */

    /* pointer to the packet */
    MultiFDPacket_t *packet;
    /* size of the next packet that contains pages */
    uint32_t next_packet_size;
    /* packets sent through this channel */
    uint64_t num_packets;
    /* ramblock host address */
    uint8_t *host;
    /* non zero pages recv through this channel */
    uint64_t total_normal_pages;
    /* buffers to recv */
    struct iovec *iov;
    /* Pages that are not zero */
    ram_addr_t *normal;
    /* num of non zero pages */
    uint32_t normal_num;
    /* used for de-compression methods */
    void *data;
} MultiFDRecvParams;

MultiFDSendParams QEMU

这个结构体非常重要,这个结构体是用来描述一个 channel/thread 的主要的结构体。

这个在 MigrationState 里是一个 array,一个 channel 对应一个此结构:

multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
typedef struct {
    // ########## 以下是只读的,只在创建和销毁的时候被写入,所以不需要锁 ##########
    // channel number
    uint8_t id;
    // channel thread name
    char *name;
    QemuThread thread;
    /* communication channel */
    QIOChannel *c;
    //...
    // 这个是只读的,只会被赋值一次
    // sizeof(MultiFDPacket_t) + sizeof(uint64_t) * page_count;
    // page_count 表示一个 packet 包含多少个 page,一般为 128
    // 所以综合下来,这个表示一个包的 metadata 的大小,包含了
    // 全局的 metadata 以及 128 个 page 的 fn 所占的大小。
    uint32_t packet_len;
    // multifd flags for sending ram
    // 目前只看到了 QIO_CHANNEL_WRITE_FLAG_ZERO_COPY 这一个 flag
    int write_flags;

    // 两个信号量,有什么区别呢?
    // 这个信号量是 main thread 用来给这个 channel 发任务用的,
    // 当有任务时,post 一下,这样 channel 那边就可以 unblock。
    // 注意 post 是单向的,也就是 channel 不可能 post 给 main thread。
    // 和下面的那个 mutex 的区别是,mutex 更倾向于保护变量不被竞争使用,
    // 而 sem 偏向于不让 channel 在没任务的时候空转,相当于一个信号的机制。
    // This is used by the master thread to notify channel thread that we have
    // new task need to do.
    QemuSemaphore sem;
    // main thread 和 channels 同步用的
    // 上面说的是什么意思,请移步 multifd_send_sync_main
    // This is used to sync between master thread and the channel thread
    QemuSemaphore sem_sync;

    // ########## 以下内容被此互斥锁保护 ##########
    QemuMutex mutex;
    // is this channel thread running (if false, it means this thread has quit)
    bool running;
    // should this thread finish
    bool quit;
    // multifd flags for each packet
    uint32_t flags;
    // 源端已经生成的 packet 的数量,所有 channel 共享的,所以不要和 num_packets 搞混
    // 可以理解为所有 packets(不只是 RAM page 信息的 packet) 都有一个 id,从 0 到 n,
    // 每一个 thread 发的是不连续 id 的 packet,比如 0, 3, 4, 8。
    // 这个值表示这个 thread 要发的下一个 id 是多少。
    uint64_t packet_num;
    // 这个 channel 已经发送了多少个 packets? 不要和 packet_num 搞混
    uint64_t num_packets;
    // 线程有工作要做了
    int pending_job;
    // 表示需要发送的页,当 pending_job 为 0 时,可以被 master 线程使用,
    // 当不为 0 时,发送线程可以使用。
    MultiFDPages_t *pages;

    // ########## 线程自有的,不需要锁 ##########
    /* pointer to the packet */
    MultiFDPacket_t *packet;
    /* size of the next packet that contains pages */
    uint32_t next_packet_size;
    /* non zero pages sent through this channel */
    uint64_t total_normal_pages;
    /* buffers to send */
    struct iovec *iov;
    /* number of iovs used */
    uint32_t iovs_num;
    // 指向一个数组,每一个元素表示一个 page 的 offset
    // p->normal[p->normal_num] = pages->offset[i];
    ram_addr_t *normal;
    // 表示上面的数组有多少个元素
    // 基于上等于 pages->num
    uint32_t normal_num;
    // used for compression methods
    void *data;
}  MultiFDSendParams;




typedef struct {
    // channel number
    uint8_t id;
    // channel thread name
    char *name;
    //...
    // This is used by the master thread to notify channel thread that we have
    // new task need to do.
    QemuSemaphore sem;
    // This is used to sync between master thread and the channel thread
    QemuSemaphore sem_sync;
    // is this channel thread running (if false, it means this thread has quit)
    bool running;
    // should this thread finish
    bool quit;
    /* multifd flags for each packet */
    uint32_t flags;
    // This thread has new packet should be sent
    int pending_job;
    // holds each page's information
    MultiFDPages_t *pages;
    // holds the packet information
    MultiFDPacket_t *packet;
    // each pages offset
    ram_addr_t *normal;
    //...
}  MultiFDSendParams;

MultiFDInit_t

typedef struct {
    uint32_t magic;
    uint32_t version;
    unsigned char uuid[16]; /* QemuUUID */
    uint8_t id; // id of the channel?
    //...
} __attribute__((packed)) MultiFDInit_t;

multifd_send_thread()

static void *multifd_send_thread(void *opaque)
{
    MultiFDSendParams *p = opaque;
    Error *local_err = NULL;
    int ret = 0;

    //...
    // send initial packet
    multifd_send_initial_packet(p, &local_err)
    // number of sent packets plus 1
    // to denote we have sent the initial packet
    p->num_packets = 1;

    while (true) {
        // Tell master thread we are ready for this round
        // +1,表示这个 channel 的这个任务干完了,可能有空了。
        qemu_sem_post(&multifd_send_state->channels_ready);
        // Wait for master thread's command to continue. master wake up us for 2 reasons
        //  1. We have new pending job
        //  2. we are asked to quit
        //  3. sometimes there are spurious wakeups
        qemu_sem_wait(&p->sem);

        //...
        // first type wake up
        if (p->pending_job) {
            uint64_t packet_num = p->packet_num;
            uint32_t flags = p->flags;
            p->normal_num = 0;
    	    MultiFDPages_t *pages = p->pages;

            if (use_zero_copy_send) {
                p->iovs_num = 0;
            } else {
                p->iovs_num = 1;
            }

            // 把每一个 non zero page 的 offset 放到 p->normal 里
            for (int i = 0; i < pages->num; i++) {
                p->normal[p->normal_num] = pages->offset[i];
                p->normal_num++;
            }

            // 如果存在非零的 page 要发送,才会 prepare
            // normal_num 是可能是 0 的,你可能会问,不是每一次攒够了一定数量
            // 或者换 ramblock 的时候才会发吗,这时候总是有 page 的,为什么 normal_num
            // 可能是 0 呢?这是因为 sync_main 函数可能会发一个空的 packet 过去,重点在于
            // 发的那个 multifd sync 的 flag。
            // Some compression methods, e.g., ZLIB, need to prepare before
            // sending the data
            if (p->normal_num)
                multifd_send_state->ops->send_prepare(p, &local_err);

            // 把 p->packets 里的元信息填上,比如每一个 page 的 offset
            multifd_send_fill_packet(p);
            // 重置 flags, num 和 block
            p->flags = 0;
            pages->num = 0;
            pages->block = NULL;
            // 我们又要发一个 packet,所以要 +1,表示已经发送的数量
            p->num_packets++;
            // 我们又要发送很多个 page,记录下来
            p->total_normal_pages += p->normal_num;
            //...
            // zero copy case, by default this capability is disabled
            //...
            // packet 本身作为 header 会发送过去
            p->iov[0].iov_len = p->packet_len;
            p->iov[0].iov_base = p->packet;

            ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
                                              0, p->write_flags, &local_err);
            //...
            p->pending_job--;

            // 如果这是一个 sync 的 task,告诉 main thread 已经完成了。
            if (flags & MULTIFD_FLAG_SYNC)
                qemu_sem_post(&p->sem_sync);
        } else if (p->quit) {
            qemu_mutex_unlock(&p->mutex);
            break;
        } else {
            qemu_mutex_unlock(&p->mutex);
            /* sometimes there are spurious wakeups */
        }
    }
    // some finishing work....
}


// legacy code:
static void *multifd_send_thread(void *opaque)
{
    MultiFDSendParams *p = opaque;
    MigrationThread *thread = NULL;
    Error *local_err = NULL;
    int ret = 0;
    //...
    // Each thread needs to send an initial packet to the destination
    // when it starts to ensure the data can be received properly.
    if (multifd_send_initial_packet(p, &local_err) < 0) {
        ret = -1;
        goto out;
    }
    //...

    // This thread runs in an infinite loop
    while (true) {
        // Tell the master thread we are idle, pls give us more work to do.
        qemu_sem_post(&multifd_send_state->channels_ready);
        // Wait for the master thread to deliver a task
        qemu_sem_wait(&p->sem);

        //...
        // We have a new pending job
        if (p->pending_job) {
            uint64_t packet_num = p->packet_num;
            uint32_t flags;
            p->normal_num = 0;
            p->iovs_num = 0;

            // copy each pages' offset from p->pages to p->normal
            for (int i = 0; i < p->pages->num; i++) {
                p->normal[p->normal_num] = p->pages->offset[i];
                p->normal_num++;
            }

            // multifd methods specific preparation work
            multifd_send_state->ops->send_prepare(p, &local_err);

            // Construct the packet
            multifd_send_fill_packet(p);
            flags = p->flags;
            p->flags = 0;
            p->num_packets++;
            p->total_normal_pages += p->normal_num;
            p->pages->num = 0;
            p->pages->block = NULL;

            //...
            // The first iovec holds the packet struct it self
            p->iov[0].iov_len = p->packet_len;
            p->iov[0].iov_base = p->packet;

            // send the data to the wire.
            ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
                                              0, p->write_flags, &local_err);
            //..
            // We have finished this work, so just minus 1
            p->pending_job--;

            if (flags & MULTIFD_FLAG_SYNC) {
                qemu_sem_post(&p->sem_sync);
            }
        } else {
            qemu_mutex_unlock(&p->mutex);
            /* sometimes there are spurious wakeups */
        }
    }

    // When this thread is about to terminate, we set the running to false
    p->running = false;
    //...
}

How to split RAM pages migrate using multi-fd?

这个函数就是 worker thread 的主函数,master thread 的主函数是 migration_thread()

multifd_queue_page 里也介绍了是怎么分配任务的,也可以看看。

Each thread has a MultiFDSendParams type struct. corresponding to an item in the params attribute in multifd_send_state:

multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);

In multifd_send_thread(), MultiFDSendParams->pending_job is checked if there is any job for this thread to do.

From comments:

How we use multifd_send_state->pages (this struct is global) and channel->pages (this struct is per-channel)?

We create a pages for each channel, and the master thread. Each time that we need to send a batch of pages we interchange the ones between multifd_send_state and the channel that is sending it. There are two reasons for that:

  • to not have to do so many mallocs during migration.
  • to make easier to know what to free at the end of migration

This way we always know who is the owner of each pages struct, and we don't need any locking. It belongs to the migration thread or to the channel thread. Switching is safe because the migration thread is using the channel mutex when changing it, and the channel have to had finish with its own, otherwise pending_job can't be false.

How does the master thread end each channel thread?

Send side:

// master thread
migration_thread
    migration_iteration_finish
        migrate_fd_cleanup_schedule
            qemu_bh_schedule(s->cleanup_bh);
                migrate_fd_cleanup_bh
                    migrate_fd_cleanup
                        multifd_save_cleanup
                            multifd_send_state->ops->send_cleanup(p, &local_err);
                                multifd_send_terminate_threads
                                    p->quit = true;

// channel thread
} else if (p->quit) {
    break;

Recv side:

process_incoming_migration_co
    mis->bh = qemu_bh_new(process_incoming_migration_bh, mis);
    qemu_bh_schedule(mis->bh);
        process_incoming_migration_bh
            multifd_load_cleanup
                multifd_recv_terminate_threads
                    p->quit = true;

multifd_recv_state

This is a global variable in destination side:

struct {
    /* number of created threads (channels), increment in multifd_recv_new_channel() */
    int count;
    /* a params list, index by channel id */
    MultiFDRecvParams *params; 
    /* syncs main thread and channel threads, see multifd_recv_thread() and multifd_recv_sync_main() for more */
    // Channel thread post, master thread wait,也是单向的。
    QemuSemaphore sem_sync;
    // global number of generated multifd packets
    // 在每次 sync 的时候会被置为最大的 thread 的 packet num,这个很简单,是因为 thread 的 packet_num
    // 表示的是这个 job 在生成时最大的 packet num, 所以取最大值即可
    uint64_t packet_num;
} *multifd_recv_state;

multifd_recv_thread() QEMU

不像 multifd_send_thread,这个函数并没有 pending job 的概念。因为 qio_channel_read_all_eof() 本身是可能 block 的(要么 block 要么 yield 出去,其实没区别)。我们只需要不断地在 while 循环里 read 直到 read 到了 EOF 就可以了,EOF 表示 peer has disconnected。

static void *multifd_recv_thread(void *opaque)
{
    MultiFDRecvParams *p = opaque;
    Error *local_err = NULL;
    int ret;

    //...
    while (true) {
        uint32_t flags;

        //...
        // 因为在 src 端的一个 channel,它是以 job 为单位来发送的,在发送每一个 job 的时候
        // 它会先发送一个 MultiFDPacket_t 过来表示一些元信息,这些元信息就是在这个函数里进行接收的。
        // page content 是在后面 recv_pages 进行接收的。
        ret = qio_channel_read_all_eof(p->c, (void *)p->packet, p->packet_len, &local_err);
        // ret == 0 的情况,表示读到 EOF 了,也就是这个 stream 要关掉了。(EOF means the peer has disconnected.)
        if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
            break;
        }

        qemu_mutex_lock(&p->mutex);
        // 把 MultiFDPacket_t 的信息解到 MultiFDRecvParams 当中去
        ret = multifd_recv_unfill_packet(p, &local_err);
        //...
        flags = p->flags;
        // recv methods don't know how to handle the SYNC flag
        p->flags &= ~MULTIFD_FLAG_SYNC;
        //...
        p->num_packets++;
        p->total_normal_pages += p->normal_num;
        qemu_mutex_unlock(&p->mutex);

        if (p->normal_num) {
            // 我们已经接收了 MultiFDPacket_t,现在是时候接收每一个 page 了,
            // 这个函数是真正从网络 stream 接收每一个 page 的 content 的地方。
            ret = multifd_recv_state->ops->recv_pages(p, &local_err);
            //...
        }

        // 表示 src 端对应的 channel thread 要求在这个 packet 来 sync
        // 那么我们就卡在这里等待 master thread 来 sync 我们好了
        if (flags & MULTIFD_FLAG_SYNC) {
            qemu_sem_post(&multifd_recv_state->sem_sync);
            qemu_sem_wait(&p->sem_sync);
        }
    }

    // 已经结束咧!
    //...
    qemu_mutex_lock(&p->mutex);
    p->running = false;
    qemu_mutex_unlock(&p->mutex);

    //...
    return NULL;
}

How does send master thread and send worker threads synchronize?

有两个 QemuSemaphore

  • 一个是全局的 multifd_send_state->channels_ready
  • 一个是每一个 worker thread 独有的 MultiFDSendParams->sem

How does receive master thread and channel thread synchronize?

有两个 QemuSemaphore

  • 一个是全局的 multifd_recv_state->sem_sync
  • 一个是每一个 worker thread 独有的 MultiFDRecvParams->sem_sync

全局的 sem_sync 是 master thread 等在上面,channels thread 往上面发信息,当所有 channel 发完信息 master thread 才会执行下一步;

单独的 sem_sync 是 channel thread 等在上面,master thread 往上面发信息。

// 只有接收到了 RAM_SAVE_FLAG_EOS 才会执行此函数
void multifd_recv_sync_main(void)
{
    multifd_recv_barrier();
    multifd_recv_unbarrier();
}

multifd_recv_sync_main() / multifd_recv_barrier() QEMU

执行完这个函数,我们可以保证 channel thread 都已经完成了接收的任务并在 block 等待 dest 端 master thread 的下一步命令。

void multifd_recv_barrier(void)
{
    //...
    // 这里在这个全局的 sem_sync 上 wait 了很多次,需要每一个 channel 都来唤醒一下
    // channel 来唤醒,表示它们已经接收完所有的信息了。
    // 因为会 wait,所以被成为 barrier。
    for (i = 0; i < migrate_multifd_channels(); i++) {
        //...
        qemu_sem_wait(&multifd_recv_state->sem_sync);
    }
}

multifd_recv_unbarrier() QEMU

用来恢复所有 channel thread 的执行。

void multifd_recv_unbarrier(void)
{
    //...
    for (i = 0; i < migrate_multifd_channels(); i++) {
        MultiFDRecvParams *p = &multifd_recv_state->params[i];
        // 这里的作用是把 packet_num 设置成所有 channel 里最大的
        // 因为 job 分给各个 channel 是有先后顺序的,最后分的肯定就是最大的
        // 因为 multifd_recv_barrier 已经执行完了,所以所有的 thread 的信息都已经接收到了
        // 所以 packet_num 的信息也已经有了,可以更新全局的 packet_num 了。
        WITH_QEMU_LOCK_GUARD(&p->mutex) {
            if (multifd_recv_state->packet_num < p->packet_num) {
                multifd_recv_state->packet_num = p->packet_num;
            }
        }
        // 依次给每一个 thread specific 的 sem_sync post,让其恢复执行
        qemu_sem_post(&p->sem_sync);
    }
}

multifd_send_state QEMU

这是一个整个 VM 全局的变量,并不是 specific 给某一个 channel 的。

struct {
    MultiFDSendParams *params;
    // 分配的空间是一个 multifd packat 所能 hold 的 pages 的数量大小
    // 我认为这是一个 buffer,用来记录当前在 send 的 private page?
    // 依据可以看函数 multifd_send_pages()
    MultiFDPages_t *shared_pages;
    // 类似上面
    MultiFDPages_t *private_pages;
    /* global number of generated multifd packets */
    uint64_t private_packet_num;
    uint64_t shared_packet_num;
    /* send channels ready */
    // 用来判断能不能下放任务给各个 thread
    // 至少为 1,也就是有一个 thread 是空的时候才能下放。
    // 注意方向是单向的,只能由 channel thread 来 post,master thread 来 wait。
    QemuSemaphore channels_ready;
    /*
     * Have we already run terminate threads.  There is a race when it
     * happens that we got one error while we are exiting.
     * We will use atomic operations.  Only valid values are 0 and 1.
     */
    int exiting;
    /* multifd ops */
    MultiFDMethods *ops;
} *multifd_send_state;

How does master thread sync with worker threads?

static void *multifd_recv_thread(void *opaque)
{
    while (true) {
        //...
        if (flags & MULTIFD_FLAG_SYNC) {
            qemu_sem_post(&multifd_recv_state->sem_sync);
            qemu_sem_wait(&p->sem_sync);
        }
        //...
    }
}

static void *multifd_send_thread(void *opaque)
{
    while (true) {
        // after each send, first wait
        qemu_sem_wait(&p->sem);
        //...
        if (flags & MULTIFD_FLAG_SYNC) {
            qemu_sem_post(&p->sem_sync);
        }
        qemu_sem_post(&multifd_send_state->channels_ready);
        //...
    }

out:
    if (local_err) {
        trace_multifd_send_error(p->id);
        multifd_send_terminate_threads(local_err);
        error_free(local_err);
    }

    /*
     * Error happen, I will exit, but I can't just leave, tell
     * who pay attention to me.
     */
    if (ret != 0) {
        qemu_sem_post(&p->sem_sync);
        qemu_sem_post(&multifd_send_state->channels_ready);
    }

    qemu_mutex_lock(&p->mutex);
    p->running = false;
    qemu_mutex_unlock(&p->mutex);

    rcu_unregister_thread();
    trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages);

    return NULL;
}

How to preserve the order of the master thread's data and channel threads data?

如何保证 master thread 在发送下一个 section header 的时候,channel 里的数据已经成功发送并被 dst 端接收了呢?如果 channel 数据只是发送了还没有接收,那 dst 端就没法分清是哪一个 section 的了?

理论上每一个 section 结束的时候:

  • src 都会执行 multifd_send_sync_main,此函数会给每一个 thread 一个 job,并且每一个 thread 的 flag 上都置上了 MULTIFD_FLAG_SYNC 发送给对面的 receive thread,并且 src 的 master thread 会发送 RAM_SAVE_FLAG_EOSqemu_fflush() 过去;此时 src 并不会停止,而是进行下一个 section 的发送。
  • dst 的 master thread(仅仅在接收到 RAM_SAVE_FLAG_EOS)时会执行 multifd_recv_sync_main,此时保证所有的 channel 都已经接收完毕,dst 的 master thread 才会开始下一步,继续接收新的信息。需要说明的是,当 dst channel thread 接收到 MULTIFD_FLAG_SYNC 时会停止接收,等待 master thread 的唤醒,所以 dst 端并不会超额接收信息,它仅仅接收 packet 的元信息(MultiFDPacket_t)里指定数量的数据。

所以不管是 MULTIFD_FLAG_SYNCRAM_SAVE_FLAG_EOS 谁先到达,我们假设:

  • 前者先到达:这样就能保证 dst channel thread 在收到 MULTIFD_FLAG_SYNC flag 时卡在那里,不会继续接收后面的信息,而是安安静静等待 src 端 master thread 发来的 RAM_SAVE_FLAG_EOS 到达的时候才会继续下一步接收。t
  • 后者先到达:dst 端 master thread 卡在 sync 的 barrier 那里,在所有 channel thread 都收到了 MULTIFD_FLAG_SYNC 之后才会去接收下一个 section 的 header 数据。

multifd_send_sync_main() 只承担了向对方 channel thread 发送 MULTIFD_FLAG_SYNC 的任务,而让对方 master thread 执行 multifd_recv_sync_main() 仍然需要 source 端的 master thread 想办法(比如发送 RAM_SAVE_FLAG_EOS)。