网络请求中,定时器一般用于超时操作的处理,由于各种定时器的实现思路都非常类似 所以由SelectConnection的代码为例子讲下定时器的实现思路。SelectConnection是pika 异步的长连接实现,适配了select, epoll, kqueue or poll等接口,由于原理都差不多,以下选择epoll的实现来讲
以下是添加定时器的部分源码
添加定时器
1 2
| def add_timeout(self, deadline, callback_method): return self._poller.add_timeout(deadline, callback_method)
|
poller的获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @staticmethod def _get_poller(): """Determine the best poller to use for this enviroment.""" poller = None if hasattr(select, 'epoll'): if not SELECT_TYPE or SELECT_TYPE == 'epoll': LOGGER.debug('Using EPollPoller') poller = EPollPoller() if not poller and hasattr(select, 'kqueue'): if not SELECT_TYPE or SELECT_TYPE == 'kqueue': LOGGER.debug('Using KQueuePoller') poller = KQueuePoller() if (not poller and hasattr(select, 'poll') and hasattr(select.poll(), 'modify')): if not SELECT_TYPE or SELECT_TYPE == 'poll': LOGGER.debug('Using PollPoller') poller = PollPoller() if not poller: LOGGER.debug('Using SelectPoller') poller = SelectPoller() return poller
|
可以看到poller针对不同的底层实现有不同的封装
所以接下来应该是查看EPollPoller的源码.
根据继承关系找到add_timeout是_PollerBase里面
1 2 3 4 5 6 7 8 9 10 11
| def add_timeout(self, deadline, callback_method): timeout_at = time.time() + deadline value = {'deadline': timeout_at, 'callback': callback_method} timeout_id = hash(frozenset(value.items())) self.[timeout_id] = value if not self._next_timeout or timeout_at < self._next_timeout: self._next_timeout = timeout_at LOGGER.debug('add_timeout: added timeout %s; deadline=%s at %s', timeout_id, deadline, timeout_at) return timeout_id
|
1
| add_timeout(self, deadline, callback_method)
|
以上所做的就是获取当前时间并加上deadline计算出具体的过期时间并和callback作为一个value放到_timeout的散列表里面
这就是add_timeout的具体过程
add_timeout添加的callback的调用则是在process_timeouts()里面被调用的,process_timeouts()所做的是遍历散列表计算过期key并调用相应callback
具体调用process_timeouts 则是以下的代码
1 2 3 4 5 6 7 8 9
| def start(self): ..... try: while not self._stopping: self.poll() self.process_timeouts() finally: .....
|
这里看到在loop里面 每次调用poll() 后都会调用process_timeouts()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| def poll(self): """Wait for events of interest on registered file descriptors until an event of interest occurs or next timer deadline or _MAX_POLL_TIMEOUT, whichever is sooner, and dispatch the corresponding event handlers. """ while True: try: events = self._poll.poll(self._get_next_deadline()) break except _SELECT_ERRORS as error: if _is_resumable(error): continue else: raise fd_event_map = defaultdict(int) for fileno, event in events: fd_event_map[fileno] |= event self._dispatch_fd_events(fd_event_map)
|
poll()的代码 _get_next_deadline() 则是计算还有多少时间处理下一个超时的请求。
总结
这个超时机制是通过epoll wait之类的超时机制做的,但是假设一种情况, 有大量的超时在poll()后需要处理,而且callback函数执行耗时比较久,这样子后续的在一定程度上没有那么精确,所以这并不是一种精确的超时方案,不过用在心跳或者连接超时上面,问题也不大
这个思路普遍适用于服务端连接的超时处理,像nginx uwsgi等的定时器思路也是和pika类似的