I/O Multiplexing: select, poll, epoll in Linux
为什么我们不直接使用多进程/多线程技术(比如说每次 accept 一个 fd 后就创建一个新线程来全权处理这个 fd 相关的数据),而是要使用 IO 多路复用呢?这是因为这样系统不必创建和维护进程/线程,从而节约了开销。
An example using select
, poll
and epoll
and explanation
先看一个分别使用 select
, poll
和 epoll
实现服务端的例子:
先是 select
的例子:
// Server program
// 此代码省略了一些无关的细节,仅供理解使用,源代码请见:
// https://www.geeksforgeeks.org/tcp-and-udp-server-using-select/
int main()
{
int listenfd, connfd, udpfd, nready, maxfdp1;
char buffer[MAXLINE];
pid_t childpid;
fd_set rset;
ssize_t n;
socklen_t len;
const int on = 1;
struct sockaddr_in cliaddr, servaddr;
char* message = "Hello Client";
void sig_chld(int);
/* create listening TCP socket */
listenfd = socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(PORT);
// binding server addr structure to listenfd
bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
listen(listenfd, 10);
/* create UDP socket */
// UDP 不需要 listen
udpfd = socket(AF_INET, SOCK_DGRAM, 0);
bind(udpfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
// clear the descriptor set
FD_ZERO(&rset);
// get maxfd
maxfdp1 = max(listenfd, udpfd) + 1;
for (;;) {
// set listenfd and udpfd in readset
FD_SET(listenfd, &rset);
FD_SET(udpfd, &rset);
// select the ready descriptor
nready = select(maxfdp1, &rset, NULL, NULL, NULL);
// 从下面关于 TCP 和 UDP 的 socket fd 我们可以得知:
// 1. select 后仍然需要遍历数组来查看到底是哪一个 fd 变 ready 了
// 2. select 后仍然需要 read 来读取内容
// if tcp socket is readable then handle
// it by accepting the connection
if (FD_ISSET(listenfd, &rset)) {
len = sizeof(cliaddr);
connfd = accept(listenfd, (struct sockaddr*)&cliaddr, &len);
if ((childpid = fork()) == 0) {
close(listenfd);
bzero(buffer, sizeof(buffer));
printf("Message From TCP client: ");
read(connfd, buffer, sizeof(buffer));
puts(buffer);
write(connfd, (const char*)message, sizeof(buffer));
close(connfd);
exit(0);
}
close(connfd);
}
// if udp socket is readable receive the message.
if (FD_ISSET(udpfd, &rset)) {
len = sizeof(cliaddr);
bzero(buffer, sizeof(buffer));
printf("\nMessage from UDP client: ");
n = recvfrom(udpfd, buffer, sizeof(buffer), 0, (struct sockaddr*)&cliaddr, &len);
puts(buffer);
sendto(udpfd, (const char*)message, sizeof(buffer), 0, (struct sockaddr*)&cliaddr, sizeof(cliaddr));
}
}
}
再是 poll
的例子。从下面的例子我们不难看出来:
-
poll
返回的直接就是有事件到达的 fd 在数组中的下标,这就省去了select
的轮询的过程; - 有一个改进是隐藏在以下代码之下的:但是它没有最大连接数的限制,原因是它是基于链表来存储的。
// Server program
// 此代码省略了一些无关的细节,仅供理解使用,源代码请见:
// https://github.com/MaximAbashev/simple_tcp-udp_poll_server/blob/master/src/server.c
int main ()
{
short port;
const int on = 1;
int tcp_sock, udp_sock, from_len, ready, contact, len, i;
int timeout_msecs = 500;
char buf[30];
struct sockaddr_in s_addr, clnt_addr, new_s_addr;
struct pollfd pds[2];
// Fabricate socket and set socket options.
s_addr.sin_family = AF_INET;
s_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
s_addr.sin_port = htons(SRV_PORT);
// TCP-socket part.
pds[0].fd = socket(AF_INET, SOCK_STREAM, 0);
pds[0].events = POLLIN;
setsockopt(pds[0].fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on));
bind(pds[0].fd, (struct sockaddr *)&s_addr, sizeof (s_addr))
listen(pds[0].fd, 1);
// UDP-socket part.
// 同样,UDP 不需要 listen
pds[1].fd = socket(AF_INET, SOCK_DGRAM, 0);
pds[1].events = POLLIN;
bind(pds[1].fd, (struct sockaddr *)&s_addr, sizeof (s_addr))
while (1) {
ready = poll(pds, 2, timeout_msecs);
if (ready < 0) {
printf ("Poll error!\n");
exit (1);
}
if (ready > 0) {
if (pds[ready].revents & POLLOUT | POLLIN) {
switch (ready) {
case 1:
len = sizeof (s_addr);
contact = accept(pds[0].fd, (struct sockaddr *)&s_addr, &len);
if(contact == (-1)) {
perror ("Connect error!\n");
exit (1);
}
from_len = recv(contact, buf, 21, 0);
write(1, buf, from_len);
send(contact, "It's for TCP client!\n", 22, 0);
close(contact);
case 2:
len = sizeof (s_addr);
while (2) {
from_len = recvfrom(pds[1].fd, buf, 21, 0, (struct sockaddr *)&s_addr, &len);
if(from_len > 0) {
write(1, buf, from_len);
break;
}
}
sendto(pds[1].fd, "It's for UDP client!\n", 22, 0,
(struct sockaddr *)&s_addr, sizeof(s_addr));
}
}
}
}
}
至于 epoll()
,我们可以看到 select()
用了一个结构体作为 fd 的管理器 FD_SET
,poll()
用来一个结构体 pollfd
作为 fd 的管理器。而 epoll 直接通过一个 fd 来管理(因为我们设计了一个新的文件系统叫做 eventpollfs),这个 fd 通过 epoll_create()
来创建。select, poll 和 epoll 三者使用的形式看起来是一样的。因此,其同异步,阻塞非阻塞都是一样的。
再看一个关于 epoll()
的例子:
- 一些常用的接口函数:
epoll_create()
,epoll_ctl()
,epoll_wait()
// Server program
// 此代码省略了一些无关的细节,仅供理解使用,源代码请见:
// https://github.com/onestraw/epoll-example/blob/master/epoll.c
int main(int argc, char *argv[])
{
int i;
int n;
int epfd;
int nfds;
int listen_sock;
int conn_sock;
int socklen;
char buf[BUF_SIZE];
struct sockaddr_in srv_addr;
struct sockaddr_in cli_addr;
struct epoll_event events[MAX_EVENTS];
listen_sock = socket(AF_INET, SOCK_STREAM, 0);
set_sockaddr(&srv_addr);
bind(listen_sock, (struct sockaddr *)&srv_addr, sizeof(srv_addr));
setnonblocking(listen_sock);
listen(listen_sock, MAX_CONN);
// 创建一个 epoll fd
epfd = epoll_create(1);
// 通过 epoll_ctl 来管理我们要监听的 fd
epoll_ctl_add(epfd, listen_sock, EPOLLIN | EPOLLOUT | EPOLLET);
socklen = sizeof(cli_addr);
for (;;) {
// epoll_wait 可以指定我们想要监听哪些事件
nfds = epoll_wait(epfd, events, MAX_EVENTS, -1);
for (i = 0; i < nfds; i++) {
if (events[i].data.fd == listen_sock) {
// handle new connection
conn_sock = accept(listen_sock, (struct sockaddr *)&cli_addr, &socklen);
inet_ntop(AF_INET, (char *)&(cli_addr.sin_addr), buf, sizeof(cli_addr));
printf("[+] connected with %s:%d\n", buf, ntohs(cli_addr.sin_port));
setnonblocking(conn_sock);
epoll_ctl_add(epfd, conn_sock, EPOLLIN | EPOLLET | EPOLLRDHUP | EPOLLHUP);
} else if (events[i].events & EPOLLIN) {
/* handle EPOLLIN event */
for (;;) {
bzero(buf, sizeof(buf));
n = read(events[i].data.fd, buf,
sizeof(buf));
if (n <= 0 /* || errno == EAGAIN */ ) {
break;
} else {
printf("[+] data: %s\n", buf);
write(events[i].data.fd, buf,
strlen(buf));
}
}
} else {
printf("[+] unexpected\n");
}
/* check if the connection is closing */
if (events[i].events & (EPOLLRDHUP | EPOLLHUP)) {
printf("[+] connection closed\n");
// 因为要 close 这个 fd,所以把它从 epoll 监听的列表里面去掉
epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, NULL);
close(events[i].data.fd);
continue;
}
}
}
}
/*
* register events of fd to epfd
*/
static void epoll_ctl_add(int epfd, int fd, uint32_t events)
{
struct epoll_event ev;
ev.events = events;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev)
}
static void set_sockaddr(struct sockaddr_in *addr)
{
bzero((char *)addr, sizeof(struct sockaddr_in));
addr->sin_family = AF_INET;
addr->sin_addr.s_addr = INADDR_ANY;
addr->sin_port = htons(DEFAULT_PORT);
}
static int setnonblocking(int sockfd)
{
fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFL, 0) | O_NONBLOCK)
}
关于 select
, poll
以及 epoll
是否阻塞的讨论
-
select
:可阻塞可非阻塞(使用数组存储 fd),有一个参数用来指定阻塞时间,如过是 NULL 就会一直阻塞,如果是 0,则会马上返回,无论是否有 fd 数据已经准备好,所以可以是阻塞也可以是不阻塞的; -
poll
:可阻塞可非阻塞(使用链表存储 fd,突破 1024 的限制),可阻塞和可非阻塞的原因可以参考select
-
epoll
:可阻塞可非阻塞(回调函数的方式,不需要像 select 一样轮询)
关于 select
, poll
以及 epoll
是否同步的讨论
三者都是同步的,因为异步只需要 I/O 操作完成的通知,并不主动读写数据,由操作系统内核完成数据的读写。 而即使是 epoll ,也需要自己手动去写。一个异步 I/O 的例子是:
Select 实现原理
因为多路复用的本质其实就是一个进程想要同时监视多个 socket,所以:
select:把进程同时加入到多个 socket 的等待队列当中,任何一个 socket 收到数据,进程都会被唤醒;
- 缺点一:每次调用 select 都要加入到等待队列(所以要传入所有的 fd 给内核);每次唤醒都要从每个队列中移除。遍历开销大,所以才规定了 1024;
- 唤醒后,不知道哪个 socket 接收到了数据,所以还要遍历一次。
- 如果这篇文章说不清epoll的本质,那就过来掐死我吧! (1) - 知乎
- 如果这篇文章说不清epoll的本质,那就过来掐死我吧! (2) - 知乎
- 如果这篇文章说不清epoll的本质,那就过来掐死我吧! (3) - 知乎
Epoll 实现原理
epoll:
- 分离“维护等待队列”和“阻塞进程”,避免每次都要传入我们需要监听的 socket(select 需要传入
FD_SET
, poll 也需要传入poll_fd
结构体,而 epoll 是通过 fd 直接管理的,在每次调用epoll_wait()
的时候不需要再传入了); - 在
epoll_wait()
返回的时候,通过添加就绪列表,能够避免遍历。
socket 等待队列 --- eventpoll
eventpoll 就绪队列 --- socket
eventpoll 等待队列 --- 进程
当包到来时:socket 通过等待队列唤醒 eventpoll,eventpoll 将 socket 加入到就绪队列,同时唤醒进程,此时进程就可以通过 eventpoll 的就绪队列获取 socket。
这篇文章对于 select, poll 以及 epoll 讲的非常详细,而且非常生动,可以参考:
异步 I/O
select
, poll
, epoll
三者都是同步的 I/O,因为因为他们都需要在读写事件就绪后程序自己负责进行读写,而不是通过一个比如回调函数之类的来读写。异步 I/O 是通过一个回调函数来处理当读写事件就绪后的情况的。
AIO
, io_uring
等是 Linux 里面的异步 I/O 的框架。下面举一个使用了异步 I/O 的例子。
异步 I/O 相比于同步 I/O(主要指的是非阻塞的同步 I/O),主要优点是更加易于编程,编程的难度较低。非阻塞的同步 I/O 由于是内核通知用户代码可以读/写的字节数,与应用逻辑可能是不匹配的,需要用户代码自己来维护缓冲区以及状态机。
Epoll 有哪些函数?
int epoll_create(int size);
在内核中创建 epoll 事件表,返回对应时间表的文件描述符。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
控制某一个 fd 上的事件,可以注册、修改和删除事件。
epfd:epoll_create 创建的文件描述符;
op:操作,添加、删除还是修改;
fd:要进行关联的文件描述符;
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
轮询 IO 事件的发生。
events 用于回传待处理事件的数组。
epoll 当中为什么要使用红黑树?
当我现在要在内核中长久的维护一个数据结构来存放文件描述符,并且时常会有插入,查找和删除的操作发生,这对内核的效率会产生不小的影响,因此需要一种插入,查找和删除效率都不错的数据结构来存放这些文件描述符,那么红黑树当然是不二的人选。
epoll 的两种模式
水平触发(Level Trigger, LT):如果可读可写就一直触发,水平出发是默认的模式。
边缘触发(Edge Trigger, ET):仅从可读变为可写或者从可写变为可读才会触发,边缘触发是高速的模式。
select 为什么只能监听 1024 个?
代码里的宏定义就是 1024 个。
五大 IO 模型
IO 模型主要处理的是从内核缓冲区到用户缓冲区拷贝的这一过程。
阻塞 IO(完成前阻塞):同步
非阻塞 IO(错误码、轮询):同步
IO 复用模型(select, poll, epoll):同步
信号驱动 IO :同步
异步 IO:异步
信号驱动 IO 是同步的原因是其并不是返回已经写好,而是返回可以写了。异步 I/O 与信号驱动 I/O 的区别在于,异步 I/O 的信号是通知应用进程 I/O 完成,而信号驱动 I/O 的信号是通知应用进程可以开始 I/O。
epoll 采用了什么数据结构?
如果这篇文章说不清epoll的本质,那就过来掐死我吧! (1) - 知乎
Terminology
Event
For epoll:
EPOLLIN, EPOLLOUT, etc.
For poll:
POLLIN, POLLOUT, etc.
For select:
select doesn't have a concept of event
, it seems it just wait for the fd to be "ready".
poll offers somewhat more flavors of events to wait for, and to receive, although for most common networked cases they don't add a lot of value.
Epoll
The central concept of the epoll API is the epoll instance.
- epoll is a kernel module
- epoll 本身也是一个文件系统,这也是 epoll 在
fs/eventpoll.c
下面实现的原因。
用户空间调用 epoll_create(0)
或 epoll_create1(int)
,其实质就是在名为 "eventpollfs" 的文件系统里创建了一个新文件,同时为该文件申请一个 fd,绑定一个 inode,最后返回该文件句柄。
Edge-trigger and level-trigger
default is level-triggered.
Edge-triggered mode delivers events only when changes occur on the monitored file descriptor.
The edge-triggered misunderstanding [LWN.net]
This edge-triggered mode is what makes epoll
an O(1) I/O multiplexer: the epoll_wait
call will suspend immediately, and since a list is maintained for each file descriptor ahead of time, when new data comes in the kernel immediately knows what processes must be woken up in O(1) time.
Here's a more worked out example of the difference between edge-triggered and level-triggered modes. Suppose your read buffer is 100 bytes, and 200 bytes of data comes in for that file descriptor. Suppose then you call read
exactly one time and then call epoll_wait
again. There's still 100 bytes of data already ready to read. In level-triggered mode the kernel would notice this and notify the process that it should call read
again. By contrast, in edge-triggered mode the kernel would immediately go to sleep. If the other side is expecting a response (e.g. the data it sent is some kind of RPC) then the two sides will "deadlock", as the server will be waiting for the client to send more data, but the client will be waiting for the server to send a response.
To use edge-triggered polling you must put the file descriptors into nonblocking mode. Then you must call read
or write
until they return EWOULDBLOCK
every time. If you fail to meet these conditions you will miss notifications from the kernel. But there's a big upside of doing this: each call to epoll_wait
will be more efficient, which can be very important on programs with extremely high levels of concurrency.
Blocking I/O, Nonblocking I/O, And Epoll
*Why edge-triggered in *O(1)?*
Epoll
这些值得好好看看:
Implementation of Epoll ❚ fd3kyt's blog
epoll - Datong's Random Thoughts
linux 源码角度看 epoll - JackTang's Blog
This RBT (Red Black Tree) is mainly used by epoll_ctl
and other management functions. Getting the ready events and epoll_wait
use other data structures instead of this RBT.
Struct eppoll_entry
Struct eventpoll
Represents the main data structure for the eventpoll interface.
eventpoll
表示 epoll instance 本身。
queues/lists In eventpoll
struct eventpoll {
//...
/*
* Wait queue used by sys_epoll_wait()
* entry is process
*/
wait_queue_head_t wq;
/*
* Wait queue used by file->poll()
* entry is process
*/
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
struct list_head rdllist;
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist;
//...
};
Struct epitem
Each file descriptor added to the eventpoll interface will have an entry of this type linked to the "rbr" RB tree.
epitem
是红黑树的节点。多个 epitem
从属于一个 eventpoll
。
In current implementation, an epitem
may be add into several containers, including:
-
eventpoll.rbr
, a red-black tree of all registeredepitem
-
eventpoll.rdllist
, a list ofepitem
representing ready file descriptors -
file.f_ep_links
, a list of all theepitem
referencing this file
Struct ep_pqueue
Just a wrapper for convenience.
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
Struct eppoll_entry
__pollwait()
The __pollwait()
function in the Linux kernel code is used to add a wait queue entry for a given file descriptor that has requested to be polled for events. When called, it inserts the current process into the wait queue associated with the file descriptor and blocks until an event occurs or a timeout expires. Once the event or timeout happens, the process is woken up and removed from the wait queue. This function is used by various system calls such as poll(), epoll_wait() etc. to efficiently wait for multiple file descriptors at once.
__ep_eventpoll_poll()
static __poll_t __ep_eventpoll_poll(struct file *file, poll_table *wait, int depth)
{
struct eventpoll *ep = file->private_data;
LIST_HEAD(txlist);
struct epitem *epi, *tmp;
poll_table pt;
__poll_t res = 0;
init_poll_funcptr(&pt, NULL);
/*
* Insert inside our poll wait queue, 当 epoll_fd 是被 select, poll 来执行时,
* wait 就是 ->poll() 时传进来的 poll_table,是有函数的;当是 epoll 递归过来的时候,函数是空的。
*/
poll_wait(file, &ep->poll_wait, wait);
/*
* Proceed to find out if wanted events are really available inside
* the ready list.
*/
mutex_lock_nested(&ep->mtx, depth);
ep_start_scan(ep, &txlist);
list_for_each_entry_safe(epi, tmp, &txlist, rdllink) {
if (ep_item_poll(epi, &pt, depth + 1)) {
res = EPOLLIN | EPOLLRDNORM;
break;
} else {
/*
* Item has been dropped into the ready list by the poll
* callback, but it's not actually ready, as far as
* caller requested events goes. We can remove it here.
*/
__pm_relax(ep_wakeup_source(epi));
list_del_init(&epi->rdllink);
}
}
ep_done_scan(ep, &txlist);
mutex_unlock(&ep->mtx);
return res;
}
这个函数很有意思,有两个地方可以调用到这个函数:
ep_eventpoll_poll()
- `ep_item_poll
第一个地方,也就是:
static const struct file_operations eventpoll_fops = {
//...
.poll = ep_eventpoll_poll,
//...
};
static __poll_t ep_eventpoll_poll(struct file *file, poll_table *wait)
{
return __ep_eventpoll_poll(file, wait, 0);
}
是为了 当我们 select 或者 poll 一个 epollfd 时起作用的。select 和 poll 会调用每一个 fd 自己的 ->poll
函数,对于传入 ->poll
的 poll_table 里的 qproc 函数,有以下可能:
- 当是 select 时,函数是
__pollwait
; - 当是 poll 时,函数也是
__pollwait
。
可见 select 和 poll 在底层的实现上有一些共享的地方。这个函数会在 __ep_eventpoll_poll()
的 poll_wait
里面调用。
第二个地方,也就是 ep_item_poll()
:
static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,
int depth)
{
struct file *file = epi->ffd.file;
__poll_t res;
pt->_key = epi->event.events;
if (!is_file_epoll(file))
res = vfs_poll(file, pt);
else
res = __ep_eventpoll_poll(file, pt, depth);
return res & epi->event.events;
}
ep_item_poll
起先是在 epoll_wait 那里调用的,它表示我们使用的 epoll_instance 有 epitem 是 epollfd,也就是我们遇到了 nested epoll 的情况。
架构
等待队列
每一个 fd/socket 有一个等待队列。队列元素是 进程。
select: 把当前进程加入每一个监听 fd 的等待队列,如果有某一个 fd 有数据带来,会唤醒。
因为 epoll_fd 本身也是一个 fd,所以他也有等待队列。
如果一个 fd 加入到了 epoll_fd 里面,那么这个进程同时存在于两个等待队列里面吗?
就绪列表(epoll 特有)
就绪列表(rdllink):epoll 特有的概念,当唤醒后可以知道是哪些 fd 就绪了,这样就不需要再遍历一遍了,其中的元素是 epitem
。通过 internal link (list_head) 来实现:
不要把这个就绪列表和 CPU 的 Process 就绪队列^搞混。
如果这篇文章说不清 epoll 的本质,那就过来掐死我吧! - 知乎
等待队列 | 就绪列表 | |
---|---|---|
Owner | fd | epoll instance |
Entry type | Process | fd (epitem) |
So you can roughly think that:
- 1 epoll instance corresponds to multiple fds;
- 1 fd corresponding s to multiples processes.
实现
epoll_ctl(EPOLL_CTL_ADD)
// do_epoll_ctl
// ep_insert
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
struct file *tfile, int fd, int full_check)
ep_insert
do most of the job:
- Create and add the
epitem
of the fd into the RBT:ep_rbtree_insert(ep, epi);
- Prepare the callback function
ep_ptable_queue_proc
for modifying "readiness"-related queues inepitem
: `init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); - Make some modifications on the "readiness"-related queues in
epitem
:ep_item_poll
, which will pass theep_ptable_queue_proc
function as a callback argument when calling.poll
VFS file operation.
What will ep_ptable_queue_proc
do?
- Add a
wait_queue_entry
to the wait queue ("readiness"-related queue), with the callback function set asep_poll_callback
. This function will be called when the queue is "activated(wake up)".
What will ep_poll_callback
do?
- Add this epitem to the epoll instance's ready list (
list_add_tail_lockless(&epi->rdllink, &ep->rdllist)
). So whenepoll_wait
return, it will know which file descriptors have new events. - "activate(
wake_up()
)" the wait queue for theepitem
's corresponding epoll instance. i.e.,wake_up(&ep->wq);
, each item in this queue is a process, the callback function isep_autoremove_wake_function()
.
Because
- 1 file struct (fd) can link to multiple epitems (let's say we created 2 epoll instances and run 2 epoll_wait() both in a single thread), and,
- 1 epitem corresponds to 1 epoll instance (not vice versa!), which can have multiple processes blocking on it (The reason is linux - When will an epoll instance shared by multiple processes? - Stack Overflow).
So we
- first use
ep_ptable_queue_proc
(actually, this one is called by the corresponding file'spoll()
file operation) to append an epitem to the file struct's wait queue and add the callbackep_poll_callback
for it. - When there is new data, the file's wait queue is activate, which means the function
ep_poll_callback
on each epitem will be executed. - When
ep_poll_callback
on an epitem is executed, the epitem's wait queue is also activated, which means the functionspace
on each process will be executed.
When will the queue (the queue hold by an struct file
) be activated?
unix_stream_sendmsg
other->sk_data_ready() (sock_def_readable)
wake_up_interruptible_sync_poll
__wake_up_sync_key
__wake_up_common_lock
__wake_up_common
Note:
- The wait queue is hold by each file.
- Each entry in wait queue can be transformed to an
epitem
, so when the callbackep_poll_callback
is invoked, it can retrieve the correspondingepitem
.
epoll_wait()
When will the process(the process call epoll_wait
) be added into the epoll instance's wait queue ep->wq
?
// epoll_wait
// do_epoll_wait
// ep_poll
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, struct timespec64 *timeout)
{
//...
while (1) {
//...
init_wait(&wait); // will set the private to current, i.e., the current process
wait.func = ep_autoremove_wake_function;
if (!eavail)
__add_wait_queue_exclusive(&ep->wq, &wait);
//...
}
//...
}
Linux AIO / io_uring
这两者都是异步 I/O。Linux AIO is now subsumed by the io_uring
API. 关于异步 I/O 和同步 I/O 的区别可以参考异步 I/O^。
用户进程发起 read()
操作之后,立刻就可以开始去做其它的事,从 kernel 的角度,当它受到一个 asynchronous read 之后,首先它会立刻返回,所以不会对用户进程产生任何 block。然后,kernel 会等待数据准备完成,然后将数据从内核空间拷贝到用户空间,当这一切都完成之后,kernel 会给用户进程发送一个 signal,告诉它 read 操作完成了。
对于 Linux AIO,需要这么使用:
- 用户通过
io_submit()
提交 I/O 请求, - 过一会再调用
io_getevents()
来检查哪些 events 已经 ready 了。 - 使程序员能编写完全异步的代码。
AIO is a horrible ad-hoc design, with the main excuse being “other, less gifted people, made that design, and we are implementing it for compatibility because database people — who seldom have any shred of taste — actually use it”.
— Linus Torvalds (on lwn.net)
io_uring
与 linux-aio
有着本质的不同:
- 在设计上是真正异步的(truly asynchronous)。只要设置了合适的 flag,它在系统调用上下文中就只是将请求放入队列, 不会做其他任何额外的事情,保证了应用永远不会阻塞。
- 支持任何类型的 I/O:cached files、direct-access files 甚至 blocking sockets。由于设计上就是异步的(async-by-design nature),因此无需 poll+read/write 来处理 sockets。 只需提交一个阻塞式读(blocking read),请求完成之后,就会出现在 completion ring。
- 灵活、可扩展:基于
io_uring
甚至能重写(reimplement)Linux 的每个系统调用。
[译] Linux 异步 I/O 框架 io_uring:基本原理、程序示例与性能压测(2020)
每个 io_uring
实例都有两个环形队列(ring),在内核和应用程序之间共享:
- 提交队列:submission queue (SQ),从名字可以看出来,是由应用程序创建,内核来消费的。
- 完成队列:completion queue (CQ),从名字可以看出来,是由内核创建,由应用程序来消费的。
这两个队列:
- 都是单生产者、单消费者,size 是 2 的幂次;
- 提供无锁接口(lock-less access interface),内部使用 内存屏障做同步(coordinated with memory barriers)。
使用方式:
- 请求:应用创建 SQ entries (SQE),更新 SQ tail;内核消费 SQE,更新 SQ head。
- 完成
- 内核为完成的一个或多个请求创建 CQ entries (CQE),更新 CQ tail;
- 应用消费 CQE,更新 CQ head。
- 完成事件(completion events)可能以任意顺序到达,但总是与特定的 SQE 相关联的。
- 消费 CQE 过程无需切换到内核态。
How to use io_uring
系统调用 API:
io_uring_setup()
io_uring_register()
io_uring_enter()
一个使用的 example:
io_uring_setup()
Syscall
执行异步 I/O 需要先设置上下文:
int io_uring_setup(u32 entries, struct io_uring_params *p);
这个系统调用:
- 创建一个 SQ 和一个 CQ,
- queue size 至少 entries 个元素,
- 返回一个文件描述符,随后用于在这个
io_uring
实例上执行操作。
SQ 和 CQ 在应用和内核之间共享,避免了在初始化和完成 I/O 时(initiating and completing I/O)拷贝数据。
io_uring_setup()
成功时返回一个 fd。应用随后可以将这个 fd 传给 mmap
系统调用,来
- map the submission and completion queues 或者
- 传给 to the
io_uring_register()
orio_uring_enter()
system calls.
io_uring_register()
Syscall
为什么需要注册一个文件或者缓冲区呢?
Registering files or user buffers allows the kernel to take long term references to internal data structures or create long term mappings of application memory, greatly reducing per-I/O overhead.
也就是这个缓冲区是用户申请的,但是需要传给 kernel,这样不需要每次传参进去造成多余的拷贝?这片缓冲区的内容 Userspace 可以改嘛,如果可以会不会对 kernel 的安全造成不好的影响?
注册用于异步 I/O 的文件或用户缓冲区:
int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);
使内核能长时间持有对该文件在内核内部的数据结构引用(internal kernel data structures associated with the files), 或创建应用内存的长期映射(long term mappings of application memory associated with the buffers), 这个操作只会在注册时执行一次,而不是每个 I/O 请求都会处理,因此减少了 per-I/O overhead。
注册的缓冲区(buffer)性质:
- Registered buffers 将会被锁定在内存中(be locked in memory),并计入用户的
RLIMIT_MEMLOCK
资源限制。 - 每个 buffer 有 1GB 的大小限制。
- 当前,buffers 必须是匿名、非文件后端的内存(anonymous, non-file-backed memory),例如 malloc(3) or mmap(2) with the MAP_ANONYMOUS flag set 返回的内存。
- Huge pages 也是支持的。整个 huge page 都会被 pin 到内核,即使只用到了其中一部分。
- 已经注册的 buffer 无法调整大小,想调整只能先 unregister,再重新 register 一个新的。
通过 eventfd()
订阅 completion 事件。
一个例子:
一个使用了 io_uring
的例子: