网络请求中,定时器一般用于超时操作的处理,由于各种定时器的实现思路都非常类似 所以由SelectConnection的代码为例子讲下定时器的实现思路。SelectConnection是pika 异步的长连接实现,适配了select, epoll, kqueue or poll等接口,由于原理都差不多,以下选择epoll的实现来讲

以下是添加定时器的部分源码

添加定时器

1
2
def add_timeout(self, deadline, callback_method):
return self._poller.add_timeout(deadline, callback_method)

poller的获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@staticmethod
def _get_poller():
"""Determine the best poller to use for this enviroment."""
poller = None
if hasattr(select, 'epoll'):
if not SELECT_TYPE or SELECT_TYPE == 'epoll':
LOGGER.debug('Using EPollPoller')
poller = EPollPoller()
if not poller and hasattr(select, 'kqueue'):
if not SELECT_TYPE or SELECT_TYPE == 'kqueue':
LOGGER.debug('Using KQueuePoller')
poller = KQueuePoller()
if (not poller and hasattr(select, 'poll') and
hasattr(select.poll(), 'modify')): # pylint: disable=E1101
if not SELECT_TYPE or SELECT_TYPE == 'poll':
LOGGER.debug('Using PollPoller')
poller = PollPoller()
if not poller:
LOGGER.debug('Using SelectPoller')
poller = SelectPoller()
return poller

可以看到poller针对不同的底层实现有不同的封装
所以接下来应该是查看EPollPoller的源码.
根据继承关系找到add_timeout是_PollerBase里面

1
2
3
4
5
6
7
8
9
10
11
def add_timeout(self, deadline, callback_method):
timeout_at = time.time() + deadline
value = {'deadline': timeout_at, 'callback': callback_method}
timeout_id = hash(frozenset(value.items()))
self.[timeout_id] = value
if not self._next_timeout or timeout_at < self._next_timeout:
self._next_timeout = timeout_at
LOGGER.debug('add_timeout: added timeout %s; deadline=%s at %s',
timeout_id, deadline, timeout_at)
return timeout_id
1
add_timeout(self, deadline, callback_method)

以上所做的就是获取当前时间并加上deadline计算出具体的过期时间并和callback作为一个value放到_timeout的散列表里面

这就是add_timeout的具体过程

add_timeout添加的callback的调用则是在process_timeouts()里面被调用的,process_timeouts()所做的是遍历散列表计算过期key并调用相应callback

具体调用process_timeouts 则是以下的代码

1
2
3
4
5
6
7
8
9
def start(self):
.....
try:
# Run event loop
while not self._stopping:
self.poll()
self.process_timeouts()
finally:
.....

这里看到在loop里面 每次调用poll() 后都会调用process_timeouts()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def poll(self):
"""Wait for events of interest on registered file descriptors until an
event of interest occurs or next timer deadline or _MAX_POLL_TIMEOUT,
whichever is sooner, and dispatch the corresponding event handlers.
"""
while True:
try:
events = self._poll.poll(self._get_next_deadline())
break
except _SELECT_ERRORS as error:
if _is_resumable(error):
continue
else:
raise
fd_event_map = defaultdict(int)
for fileno, event in events:
fd_event_map[fileno] |= event
self._dispatch_fd_events(fd_event_map)

poll()的代码 _get_next_deadline() 则是计算还有多少时间处理下一个超时的请求。

总结

这个超时机制是通过epoll wait之类的超时机制做的,但是假设一种情况, 有大量的超时在poll()后需要处理,而且callback函数执行耗时比较久,这样子后续的在一定程度上没有那么精确,所以这并不是一种精确的超时方案,不过用在心跳或者连接超时上面,问题也不大

这个思路普遍适用于服务端连接的超时处理,像nginx uwsgi等的定时器思路也是和pika类似的