在linux 没有实现epoll事件驱动机制之前,我们一般选择用select或者poll等io多路复用的方法来实现并发服务程序。在linux新的内核中,有了一种替换它的机制,就是epoll。
select()和poll() io多路复用模型
select的缺点:
1.单个进程能够监视的文件描述符的数量存在最大限制,通常是1024,当然可以更改数量,但由于select采用轮询的方式扫描文件描述符,文件描述符数量越多,性能越差;(在linux内核头文件中,有这样的定义:#define __fd_setsize 1024)
2.内核 / 用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销;
3.select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件;
4.select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行io操作,那么之后每次select调用还是会将这些文件描述符通知进程。
相比select模型,poll使用链表保存文件描述符,因此没有了监视文件数量的限制,但其他三个缺点依然存在。
假设我们的服务器需要支持100万的并发连接,则在__fd_setsize 为1024的情况下,则我们至少需要开辟1k个进程才能实现100万的并发连接。除了进程间上下文切换的时间消耗外,从内核/用户空间大量的无脑内存拷贝、数组轮询等,是系统难以承受的。因此,基于select模型的服务器程序,要达到10万级别的并发访问,是一个很难完成的任务。
epoll io多路复用模型实现机制
由于epoll的实现机制与select/poll机制完全不同,上面所说的 select的缺点在epoll上不复存在。
设想一下如下场景:有100万个客户端同时与一个服务器进程保持着tcp连接。而每一时刻,通常只有几百上千个tcp连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?
在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。
epoll的设计和实现与select完全不同。epoll通过在linux内核中申请一个简易的文件系统(文件系统一般用什么数据结构实现?b+树)。把原先的select/poll调用分成了3个部分:
1)调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)
2)调用epoll_ctl向epoll对象中添加这100万个连接的套接字
3)调用epoll_wait收集发生的事件的连接
如此一来,要实现上面说是的场景,只需要在进程启动时建立一个epoll对象,然后在需要的时候向这个epoll对象中添加或者删除连接。同时,epoll_wait的效率也非常高,因为调用epoll_wait时,并没有一股脑的向操作系统复制这100万个连接的句柄数据,内核也不需要去遍历全部的连接。
epoll实现机制
当某一进程调用epoll_create方法时,linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关。eventpoll结构体如下所示:
?
struct
eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct
rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct
list_head rdlist;
....
};
每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)。
而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。
在epoll中,对于每一个事件,都会建立一个epitem结构体,如下所示:
?
struct
epitem{
struct
rb_node rbn;
//红黑树节点
struct
list_head rdllink;
//双向链表节点
struct
epoll_filefd ffd;
//事件句柄信息
struct
eventpoll *ep;
//指向其所属的eventpoll对象
struct
epoll_event event;
//期待发生的事件类型
}
当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。
通过红黑树和双链表数据结构,并结合回调机制,造就了epoll的高效。
epoll的接口
1.epoll_create
创建epoll句柄
函数声明:int epoll_create(int size)
参数:size用来告诉内核这个监听的数目一共有多大。
返回值:返回创建了的epoll句柄。
当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
2.epoll_ctl
将被监听的描述符添加到epoll句柄或从epool句柄中删除或者对监听事件进行修改。
函数申明:int epoll_ctl(int epfd, int op, int fd, struct epoll_event*event);
参数:
epfd: epoll_create()的返回值
op:表示要进行的操作,其值分别为:
epoll_ctl_add: 注册新的fd到epfd中;
epoll_ctl_mod: 修改已经注册的fd的监听事件;
epoll_ctl_del: 从epfd中删除一个fd;
fd:需要操作/监听的文件句柄
event:是告诉内核需要监听什么事件,struct epoll_event如下:
?
typedef
union
epoll_data {
void
*ptr;
int
fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct
epoll_event {
__uint32_t events;
/* epoll events */
epoll_data_t data;
/* user data variable */
};
events可以是以下几个宏的集合:
epollin:触发该事件,表示对应的文件描述符上有可读数据。(包括对端socket正常关闭);
epollout:触发该事件,表示对应的文件描述符上可以写数据;
epollpri:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
epollerr:表示对应的文件描述符发生错误;
epollhup: 表示对应的文件描述符被挂断;
epollet:将epoll设为边缘触发(edgetriggered)模式,这是相对于水平触发(level triggered)来说的。
epolloneshot: 只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到epoll队列里。
示例:
?
struct
epoll_event ev;
//设置与要处理的事件相关的文件描述符
ev.data.fd=listenfd;
//设置要处理的事件类型
ev.events=epollin|epollet;
//注册epoll事件
epoll_ctl(epfd,epoll_ctl_add,listenfd,&ev);
1.epoll_wait
等侍注册在epfd上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中。
函数原型:int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
参数:
epfd:由epoll_create 生成的epoll文件描述符
events:用于回传代处理事件的数组
maxevents:每次能处理的最大事件数
timeout:等待i/o事件发生的超时毫秒数,-1相当于阻塞,0相当于非阻塞。一般用-1即可
epoll的工作模式
et(edgetriggered):高速工作模式,只支持no_block(非阻塞模式)。在此模式下,当描述符从未就绪变为就绪时,内核通过epoll告知。然后它会假设用户知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到某些操作导致那个文件描述符不再为就绪状态了。(触发模式只在数据就绪时通知一次,若数据没有读完,下一次不会通知,直到有新的就绪数据)
lt(leveltriggered):缺省工作方式,支持blocksocket和no_blocksocket。在lt模式下内核会告知一个文件描述符是否就绪了,然后可以对这个就绪的fd进行io操作。如果不作任何操作,内核还是会继续通知!若数据没有读完,内核也会继续通知,直至设备数据为空为止!
示例说明:
1.我们已经把一个用来从管道中读取数据的文件句柄(rfd)添加到epoll描述符
2. 这个时候从管道的另一端被写入了2kb的数据
3. 调用epoll_wait(2),并且它会返回rfd,说明它已经准备好读取操作
4. 然后我们读取了1kb的数据
5. 调用epoll_wait(2)……
et工作模式:
如果我们在第1步将rfd添加到epoll描述符的时候使用了epollet标志,在第2步执行了一个写操作,第三步epoll_wait会返回同时通知的事件会销毁。因为第4步的读取操作没有读空文件输入缓冲区内的数据,因此我们在第5步调用epoll_wait(2)完成后,是否挂起是不确定的。epoll工作在et模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。
只有当read(2)或者write(2)返回eagain时(认为读完)才需要挂起,等待。但这并不是说每次read()时都需要循环读,直到读到产生一个eagain才认为此次事件处理完成,当read()返回的读到的数据长度小于请求的数据长度时(即小于sizeof(buf)),就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。
lt工作模式:
lt方式调用epoll接口的时候,它就相当于一个速度比较快的poll(2),并且无论后面的数据是否被使用,因此他们具有同样的职能。
示例
?
/*
* file epolltest.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <string.h>
#define maxevents 64
//函数:
//功能:创建和绑定一个tcp socket
//参数:端口
//返回值:创建的socket
static
int
create_and_bind (
char
*port)
{
struct
addrinfo hints;
struct
addrinfo *result, *rp;
int
s, sfd;
memset
(&hints, 0,
sizeof
(
struct
addrinfo));
hints.ai_family = af_unspec;
/* return ipv4 and ipv6 choices */
hints.ai_socktype = sock_stream;
/* we want a tcp socket */
hints.ai_flags = ai_passive;
/* all interfaces */
s = getaddrinfo (null, port, &hints, &result);
if
(s != 0)
{
fprintf
(stderr,
"getaddrinfo: %s\n"
, gai_strerror (s));
return
-1;
}
for
(rp = result; rp != null; rp = rp->ai_next)
{
sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if
(sfd == -1)
continue
;
s = bind (sfd, rp->ai_addr, rp->ai_addrlen);
if
(s == 0)
{
/* we managed to bind successfully! */
break
;
}
close (sfd);
}
if
(rp == null)
{
fprintf
(stderr,
"could not bind\n"
);
return
-1;
}
freeaddrinfo (result);
return
sfd;
}
//函数
//功能:设置socket为非阻塞的
static
int
make_socket_non_blocking (
int
sfd)
{
int
flags, s;
//得到文件状态标志
flags = fcntl (sfd, f_getfl, 0);
if
(flags == -1)
{
perror
(
"fcntl"
);
return
-1;
}
//设置文件状态标志
flags |= o_nonblock;
s = fcntl (sfd, f_setfl, flags);
if
(s == -1)
{
perror
(
"fcntl"
);
return
-1;
}
return
0;
}
//端口由参数argv[1]指定
int
main (
int
argc,
char
*argv[])
{
int
sfd, s;
int
efd;
struct
epoll_event event;
struct
epoll_event *events;
if
(argc != 2)
{
fprintf
(stderr,
"usage: %s [port]\n"
, argv[0]);
exit
(exit_failure);
}
sfd = create_and_bind (argv[1]);
if
(sfd == -1)
abort
();
s = make_socket_non_blocking (sfd);
if
(s == -1)
abort
();
s = listen (sfd, somaxconn);
if
(s == -1)
{
perror
(
"listen"
);
abort
();
}
//除了参数size被忽略外,此函数和epoll_create完全相同
efd = epoll_create1 (0);
if
(efd == -1)
{
perror
(
"epoll_create"
);
abort
();
}
event.data.fd = sfd;
event.events = epollin | epollet;
//读入,边缘触发方式
s = epoll_ctl (efd, epoll_ctl_add, sfd, &event);
if
(s == -1)
{
perror
(
"epoll_ctl"
);
abort
();
}
/* buffer where events are returned */
events =
calloc
(maxevents,
sizeof
event);
/* the event loop */
while
(1)
{
int
n, i;
n = epoll_wait (efd, events, maxevents, -1);
for
(i = 0; i < n; i++)
{
if
((events[i].events & epollerr) ||
(events[i].events & epollhup) ||
(!(events[i].events & epollin)))
{
/* an error has occured on this fd, or the socket is not
ready for reading (why were we notified then?) */
fprintf
(stderr,
"epoll error\n"
);
close (events[i].data.fd);
continue
;
}
else
if
(sfd == events[i].data.fd)
{
/* we have a notification on the listening socket, which
means one or more incoming connections. */
while
(1)
{
struct
sockaddr in_addr;
socklen_t in_len;
int
infd;
char
hbuf[ni_maxhost], sbuf[ni_maxserv];
in_len =
sizeof
in_addr;
infd = accept (sfd, &in_addr, &in_len);
if
(infd == -1)
{
if
((
errno
== eagain) ||
(
errno
== ewouldblock))
{
/* we have processed all incoming
connections. */
break
;
}
else
{
perror
(
"accept"
);
break
;
}
}
//将地址转化为主机名或者服务名
s = getnameinfo (&in_addr, in_len,
hbuf,
sizeof
hbuf,
sbuf,
sizeof
sbuf,
ni_numerichost | ni_numericserv);
//flag参数:以数字名返回
//主机地址和服务地址
if
(s == 0)
{
printf
(
"accepted connection on descriptor %d "
"(host=%s, port=%s)\n"
, infd, hbuf, sbuf);
}
/* make the incoming socket non-blocking and add it to the
list of fds to monitor. */
s = make_socket_non_blocking (infd);
if
(s == -1)
abort
();
event.data.fd = infd;
event.events = epollin | epollet;
s = epoll_ctl (efd, epoll_ctl_add, infd, &event);
if
(s == -1)
{
perror
(
"epoll_ctl"
);
abort
();
}
}
continue
;
}
else
{
/* we have data on the fd waiting to be read. read and
display it. we must read whatever data is available
completely, as we are running in edge-triggered mode
and won't get a notification again for the same
data. */
int
done = 0;
while
(1)
{
ssize_t count;
char
buf[512];
count = read (events[i].data.fd, buf,
sizeof
(buf));
if
(count == -1)
{
/* if errno == eagain, that means we have read all
data. so go back to the main loop. */
if
(
errno
!= eagain)
{
perror
(
"read"
);
done = 1;
}
break
;
}
else
if
(count == 0)
{
/* end of file. the remote has closed the
connection. */
done = 1;
break
;
}
/* write the buffer to standard output */
s = write (1, buf, count);
if
(s == -1)
{
perror
(
"write"
);
abort
();
}
}
if
(done)
{
printf
(
"closed connection on descriptor %d\n"
,
events[i].data.fd);
/* closing the descriptor will make epoll remove it
from the set of descriptors which are monitored. */
close (events[i].data.fd);
}
}
}
}
free
(events);
close (sfd);
return
exit_success;
}
代码编译后,./epolltest 8888 ,在另外一个终端中执行telnet 192.168.1.161 8888 ,192.168.1.161
为执行测试程序的ip。在telnet终端敲入任何字符敲入enter后,会在测试终端显示敲入的字符。
总结
以上就是本文关于linux epoll机制详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!
原文链接:http://blog.csdn.net/u010657219/article/details/44061629
原创文章,作者:SLYND,如若转载,请注明出处:https://www.wangzhanshi.com/n/6404.html