网络程序需要处理的第三类事件就是定时器,比如定期检测一个客户连接的活动状态。为此,我们要将每个定时器事件分别封装成定时器,以实现对定时时间的统一管理。换句话说,定时机制是定时器得以被处理的原动力。进一步,如果是双向的,则每个定时器还需要包含指向前一个定时器的指针成员。升序定时器链表将其中的定时器按照超时时间做升序排列。

网络程序需要处理的第三类事件就是定时器,比如定期检测一个客户连接的活动状态。服务器程序通常管理着众多定时事件,因此有效地组织这些定时事件,使之能在预期的时间点被触发且不影响服务器的主要逻辑,ui对于服务器的性能有着至关重要的影响。为此,我们要将每个定时器事件分别封装成定时器,以实现对定时时间的统一管理。
不过在讨论如何组织定时器之前,我们先要介绍定时的方法。
定时是指在一段时间之内触发某段代码的机制,我们可以在这段代码中以此处理所有到期的定时器。换句话说,定时机制是定时器得以被处理的原动力。
Linux提供了三种定时方法,它们是:
- socket选项SO_RECVTIMEO和SO_SNDTIMEO
- SIGALARM信号
- I/O复用系统调用的超时参数
SO_RECVTIMEO和SO_SNDTIMEO,它们分别用来设置socket接收数据超时时间和发送数据超时时间。因此,这两个选项仅对与数据接收和发送相关的socket专用系统调用有关,如下表:
由上图可见,程序中,我们可以根据系统调用的返回值以及errno来判断超时时间是否已到,进而决定是否开始处理定时任务。
下面示例代码以connect为例,说明程序中如何使用SO_SNDTIMEO选项来定时。
#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <stdlib.h>#include <assert.h>#include <stdio.h>#include <errno.h>#include <fcntl.h>#include <unistd.h>#include <string.h>/*超时连接函数*/int timeout_connect(const char* ip, int port, int time){int ret = 0;struct sockaddr_in address;bzero(&address, sizeof(address));address.sin_family = AF_INET;inet_pton(AF_INET, ip, &address.sin_addr);address.sin_port = htons(port);int sockfd = socket(PF_INET, SOCK_STREAM, 0);assert(sockfd >= 0);/*通过选项SO_RCVTIMEO和SO_SNDTIMEO所设置的超时时间的类型是timeval,这个select系统调用的超时参数类型相同*/struct timeval timeout;timeout.tv_sec = time;timeout.tv_usec = 0;socklen_t len = sizeof(timeout);ret = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len);if(ret == -1){/*超时对应的错误号EINPROGRESS。下面这条如果成立,我们就可以处理定时任务了*/if(errno == EINPROGRESS){printf("connecting timeout, process timeout logic\n");return -1;}printf("error occour when connecting to server\n");return -1;}return sockfd;}int main(int argc, char const *argv[]){if(argc <= 2){printf("usage: %s ip_address port_number\n", basename(argv[0]));return 1;}const char* ip = argv[1];int port = atoi(argv[2]);int sockfd = timeout_connect(ip, port, 10);if(sockfd < 0){return 1;}return 0;}
由alarm和setitimer函数设置的实时闹钟一旦超时,将触发SIGALRM信号。因此,我们可以利用该信号的信号处理来处理定时任务。但是要处理多个定时任务,我们就需要不断地触发SIGALRM信号,并在其信号处理函数中执行到期的任务。
一般来说,SIGALRM信号按照固定的频率生成,即由alarm和setitimer函数设置的定时周期T保持不变。如果某个定时任务的超时时间不是T的整数倍,那么它实际被执行的时间和预期的时间将略有偏差,因此定时周期T反映了定时的精度。
定时器通常至少要包含两个成员:一个超时时间(相对时间或者绝对时间)和一个任务回调函数。有的时候还可能包含回调函数被执行时需要传入的参数,以及是否重启定时器等信息。如果使用链表作为容器来串联所有的定时器,则每个定时器还要包含指向下一个定时器的指针成员。进一步,如果是双向的,则每个定时器还需要包含指向前一个定时器的指针成员。
下面的示例代码,实现了一个简单的升序定时器链表。升序定时器链表将其中的定时器按照超时时间做升序排列。
#include <time.h>#include <sys/types.h>#include <netinet/in.h>#include <stdio.h>#define BUFFER_SIZE 64class util_timer;/*用户数据结构:客户端socket地址、socker文件描述符、读缓存、定时器*/struct client_data{sockaddr_in address;int sockfd;char buf[BUFFER_SIZE];util_timer* timer;};/*定时器类*/class util_timer{public:util_timer(): prev(NULL), next(NULL){};public:time_t expire; /*任务的超时时间, 这里使用的是绝对时间*/void (*cb_func)(client_data*); /*任务回调函数*//*回调函数处理的客户数据,由定时器的执行者传递给回调函数*/client_data* user_data;util_timer* prev; /*指向前一个定时器*/util_timer* next; /*指向下一个定时器*/};/*定时器链表,它是一个升序,双向链表,且带有头节点和尾节点*/class sort_timer_lst{public:sort_timer_lst():head(NULL), tail(NULL){}/*链表被销毁时,删除其中所有的定时器*/~sort_timer_lst(){util_timer* tmp = head;while(tmp){head = tmp->next;delete tmp;tmp = head;}}/*将目标定时器timer添加到链表中*/void add_timer(util_timer* timer){if(!timer){return;}if(!head){head = tail = timer;return;}/*如果目标定时器的潮湿时间小于当前链表中所有定时器的超市时间,则把该定时器插入链表头部,作为链表新的头节点,否则就需要调用重载函数add_timer(util_timer* timer, util_timer *lst_head), 把它插入链表中合适的位置,以保证链表的升序特性,*/if(timer->expire < head->expire){timer->next = head;head->prev = timer;head = timer;return;}add_timer(timer, head);}/*当某个定时器任务发生变化时,调整对应的定时器在链表中的位置*这个函数只考虑`超时时间延长的情况*/void adjust_timer(util_timer* timer){if(!timer){return;}util_timer* tmp = timer->next;if(!tmp || (timer->expire < tmp->expire)){return;}if(timer == head){head = head->next;head->prev = NULL;timer->next = NULL;add_timer(timer, head);}else{timer->prev->next = timer->next;timer->next->prev = timer->prev;add_timer(timer, timer->next);}}void del_timer(util_timer* timer){if(!timer){return;}if(timer == head && timer == tail){delete timer;head = NULL;tail = NULL;return;}if(timer == head){head = head->next;head->prev = NULL;delete timer;return;}if(timer == tail){tail = tail->prev;tail->next = NULL;delete timer;return;}timer->prev->next = timer->next;timer->next->prev = timer->prev;delete timer;}/*SIGALARM信号每次被处罚时就在其信号处理函数*(如果使用统一事件源,则是主函数)中执行一次tick函数,以处理链表上的到期的任务*/void tick(){if(!head){return;}printf("timer ticker\n");time_t cur = time(NULL); /*获取系统当前的时间*/util_timer* tmp = head;while(tmp){/*每个定时器使用的是绝对时间*/if(cur < tmp->expire){break;}tmp->cb_func(tmp->user_data);head = head->next;if(head){head->prev = NULL;}delete tmp;tmp = head;}}private:/*一个重要的辅助函数,它被公有的add_timer和adjust_timer函数调用,*该函数表示将目标定时器timer添加到lst_head之后的部分链表中*/void add_timer(util_timer* timer, util_timer* lst_head){util_timer* prev = lst_head;util_timer* tmp = prev->next;/*遍历lst_head节点之后的部分链表,*直到找到一个超时时间大于目标定时器*的超时时间的节点,并将目标定时器插入*该节点之前*/while(tmp){if(timer->expire < tmp->expire){prev->next = timer;timer->next = tmp;tmp->prev = timer;timer->prev = prev;break;}prev = tmp;tmp = prev->next;}/*如果遍历完还没找到,则插入链表尾部,*并把它设置为新的链表尾部*/if(!tmp){prev->next = timer;timer->prev = prev;timer->next = NULL;tail = timer;}}util_timer* head;util_timer* tail;};
上述示例代码的核心函数是tick相当于一个心跳函数,它每隔一段固定时间就执行一次,一检测并处理到期的任务。
现在我们考虑上述升序定时器链表的实际应用 ----- 处理非活动连接。服务器程序通常要定期处理非活动连接:给客户端发一个重连请求,或者关闭该连接,或者其他。
Linux在内核中提供了对连接是否处于活动状态的定期检查机制,我们可以通过socket选项KEEPALIVE来激活它。不过使用这种方式将使得应用程序对连接的管理变得复杂。因此,我们可以考虑在应用层实现类似KEEPLIVE的机制,以管理所有长时间处于非活动状态的连接。
下面示例代码利用alarm函数周期性地触发SIGALRM信号,该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务 ---- 关闭非活动的连接。
#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <assert.h>#include <stdio.h>#include <signal.h>#include <unistd.h>#include <errno.h>#include <string.h>#include <fcntl.h>#include <stdlib.h>#include <sys/epoll.h>#include <pthread.h>#include "lst_timer.h"#define FD_LIMIT 65535#define MAX_EVENT_NUMBER 1024#define TIMESLOT 5static int pipefd[2];static sort_timer_lst timer_lst;static int epollfd = 0;int setnonblocking(int fd){int old_option = fcntl(fd, F_GETFL);int new_option = old_option | O_NONBLOCK;fcntl(fd, F_SETFL, new_option);return old_option;}void addfd(int epollfd, int fd){epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET;epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);setnonblocking(fd);}void sig_handler(int sig){int save_errno = errno;int msg = sig;send(pipefd[1], (char*)msg, 1, 0);errno = save_errno;}void addsig(int sig){struct sigaction sa;memset(&sa, '\0', sizeof(sa));sa.sa_handler = sig_handler;sa.sa_flags |= SA_RESTART;sigfillset(&sa.sa_mask);assert(sigaction(sig, &sa, NULL) != -1);}void timer_handler(){/*定时处理任务,实际上就是调用tick函数*/timer_lst.tick();/*因为一次alarm调用只会引起一次SIGALRM信号,*所以我们要重新定时,以不断触发SIGALRM信号*/alarm(TIMESLOT);}/*定时器回调函数,它删除非活动连接socket上的注册事件,并关闭之*/void cb_func(client_data* user_data){assert(user_data);epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);close(user_data->sockfd);printf("close fd %d\n, user_data->sockfd");}int main(int argc, char const *argv[]){if(argc <= 2){printf("usage: %s ip_address port_number\n", basename(argv[0]));return 1;}const char* ip = argv[1];int port = atoi(argv[2]);int ret = 0;struct sockaddr_in address;bzero(&address, sizeof(address));address.sin_family = AF_INET;inet_pton(AF_INET, ip, &address.sin_addr);address.sin_port = htons(port);int listenfd = socket(PF_INET, SOCK_STREAM, 0);assert(listenfd >= 0);ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));assert(ret != -1);ret = listen(listenfd, 5);assert(ret != -1);epoll_event events[MAX_EVENT_NUMBER];epollfd = epoll_create(5);assert(epollfd != -1);addfd(epollfd, listenfd);ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);assert(ret != -1);setnonblocking(pipefd[1]);addfd(epollfd, pipefd[0]);addsig(SIGALRM);addsig(SIGTERM);bool stop_server = false;client_data* users = new client_data[FD_LIMIT];bool timeout = false;alarm(TIMESLOT);while (!stop_server){int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);if( number < 0 && errno != EINTR){printf("epoll failuer\n");break;}for(int i=0 ; i < number;i){int sockfd = events[i].data.fd;if(sockfd == listenfd){struct sockaddr_in client_address;socklen_t client_addrlength = sizeof(client_address);int connfd = accept(listenfd, (struct sockaddr*)&client_address,&client_addrlength);addfd(epollfd, connfd);users[connfd].address = client_address;users[connfd].sockfd = connfd;/*创建定时器,设置其回调函数与超时时间,然后绑定定时器与用户数据,最后将定时器添加到链表timer_lst中*/util_timer* timer = new util_timer;timer->user_data = &users[connfd];timer->cb_func = cb_func;time_t cur = time(NULL);timer->expire = cur3 * TIMESLOT;users[connfd].timer = timer;timer_lst.add_timer(timer);}else if(sockfd == pipefd[0] && events[i].events&EPOLLIN){int sig;char signals[1024];ret = recv(pipefd[0], signals, sizeof(signals), 0);if(ret == -1){//handle the errorcontinue;}else if(ret == 0){continue;}else{for(int i = 0; i < ret;i){switch (signals[i]){case SIGALRM:/*用timeout变量标记有定时任务需要处理,*但不立即处理定时任务,*这是因为定时任务的优先级不是很高,*我们优先处理其他更重要的任务*/timeout = true;break;case SIGTERM:stop_server = true;default:break;}}}}else if(events[i].events & EPOLLIN){memset(users[sockfd].buf, '\0', BUFFER_SIZE);ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0);printf("get %d bytes of client data %d from %d\n", ret,users[sockfd].buf, sockfd);util_timer* timer = users[sockfd].timer;if(ret < 0){/*发生错误,则关闭连接,移除定时器*/if(errno != EAGAIN){cb_func(&users[sockfd]);if(timer){timer_lst.del_timer(timer);}}}else if(ret == 0){/*对方已关闭*/cb_func(&users[sockfd]);if(timer){timer_lst.del_timer(timer);}}else{/*客户连接上有数据可读,则重新调整定时器*/if(timer){time_t cur = time(NULL);timer->expire = cur3*TIMESLOT;printf("adjust timer once\n");timer_lst.adjust_timer(timer);}}}else{//others}}/*最后处理定时事件,因为I/O事件有更高的优先级。*当然这样做将导致定时任务不能精确地按照预期的时间执行*/if(timeout){timer_handler();timeout = false;}}close(listenfd);close(pipefd[0]);close(pipefd[1]);delete[] users;return 0;}
Linux下的三组I/O复用系统都带有超时参数,因此它们不仅能统一处理信号和I/O事件,也能统一处理定时事件。但是由于I/O复用系统可能在超时时间到期之前就返回(有I/O时间发生),所以如果我们要利用它们来定时,就需要不断更新定时参数以反映剩余的时间。
示例代码如下:
#define TIMEOUT 5000int timeout = TIMEOUT;time_t start = time(NULL);time_t end = time(NULL);while(1){printf("the timeout is now %d mil-seconds\n", timeout);start = time(NULL);int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, timeout);if(number < 0 && errno!=EINTER){printf("epoll failure\n");break;}/*如果epoll_wait成功返回0,则说明超时时间到*此处便可处理定时任务,并重置定时时间*/if(number == 0){timeout = TIMEOUT;continue;}end = time(NULL);/*如果epoll_wait的返回值大于0, 则本次epoll_wait调用的时间是(end-start)*1000 ms*我们需要将这段时间减去,以获得下次epoll_wait调用的超时参数*/timeout -= (end-start)*1000;if(timeout < 0){timeout = TIMEOUT;}//handle connections}
