V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
rapospectre
V2EX  ›  Python

深入理解 tornado 之 底层 ioloop 实现(三)

  •  
  •   rapospectre ·
    bluedazzle · 2016-06-07 15:57:14 +08:00 · 3658 次点击
    这是一个创建于 3153 天前的主题,其中的信息可能已经有所发展或是发生改变。

    承接之前的文章:深入理解 tornado 之 底层 ioloop 实现(二)

    start

    ioloop 最核心的部分:

    def start(self):
            if self._running:       # 判断是否已经运行
                raise RuntimeError("IOLoop is already running")
            self._setup_logging()
            if self._stopped:
                self._stopped = False  # 设置停止为假
                return
            old_current = getattr(IOLoop._current, "instance", None)
            IOLoop._current.instance = self
            self._thread_ident = thread.get_ident()  # 获得当前线程标识符
            self._running = True # 设置运行
    
            old_wakeup_fd = None
            if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
                try:
                    old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
                    if old_wakeup_fd != -1:
                        signal.set_wakeup_fd(old_wakeup_fd)
                        old_wakeup_fd = None
                except ValueError:
                    old_wakeup_fd = None
    
            try:
                while True:  # 服务器进程正式开始,类似于其他服务器的 serve_forever
                    with self._callback_lock: # 加锁,_callbacks 做为临界区不加锁进行读写会产生脏数据
                        callbacks = self._callbacks # 读取 _callbacks
                        self._callbacks = []. # 清空 _callbacks
                    due_timeouts = [] # 用于存放这个周期内已过期( 已超时 )的任务
                    if self._timeouts: # 判断 _timeouts 里是否有数据
                        now = self.time() # 获取当前时间,用来判断 _timeouts 里的任务有没有超时
                        while self._timeouts: # _timeouts 有数据时一直循环, _timeouts 是个最小堆,第一个数据永远是最小的, 这里第一个数据永远是最接近超时或已超时的
                            if self._timeouts[0].callback is None: # 超时任务无回调
                                heapq.heappop(self._timeouts) # 直接弹出
                                self._cancellations -= 1 # 超时计数器 - 1
                            elif self._timeouts[0].deadline <= now: # 判断最小的数据是否超时
                                due_timeouts.append(heapq.heappop(self._timeouts)) # 超时就加到已超时列表里。
                            else:
                                break # 因为最小堆,如果没超时就直接退出循环( 后面的数据必定未超时 )
                        if (self._cancellations > 512
                                and self._cancellations > (len(self._timeouts) >> 1)):  # 当超时计数器大于 512 并且 大于 _timeouts 长度一半( >> 为右移运算, 相当于十进制数据被除 2 )时,清零计数器,并剔除 _timeouts 中无 callbacks 的任务
                            self._cancellations = 0
                            self._timeouts = [x for x in self._timeouts
                                              if x.callback is not None]
                            heapq.heapify(self._timeouts) # 进行 _timeouts 最小堆化
    
                    for callback in callbacks:
                        self._run_callback(callback) # 运行 callbacks 里所有的 calllback
                    for timeout in due_timeouts:
                        if timeout.callback is not None:
                            self._run_callback(timeout.callback) # 运行所有已过期任务的 callback
                    callbacks = callback = due_timeouts = timeout = None # 释放内存
    
                    if self._callbacks: # _callbacks 里有数据时
                        poll_timeout = 0.0 # 设置 epoll_wait 时间为 0 ( 立即返回 )
                    elif self._timeouts: # _timeouts 里有数据时
                        poll_timeout = self._timeouts[0].deadline - self.time() 
    					# 取最小过期时间当 epoll_wait 等待时间,这样当第一个任务过期时立即返回
                        poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
    					# 如果最小过期时间大于默认等待时间 _POLL_TIMEOUT = 3600 ,则用 3600 ,如果最小过期时间小于 0 就设置为 0 立即返回。
                    else:
                        poll_timeout = _POLL_TIMEOUT # 默认 3600 s 等待时间
    
                    if not self._running: # 检查是否有系统信号中断运行,有则中断,无则继续
                        break
    
                    if self._blocking_signal_threshold is not None:
                        signal.setitimer(signal.ITIMER_REAL, 0, 0) # 开始 epoll_wait 之前确保 signal alarm 都被清空( 这样在 epoll_wait 过程中不会被 signal alarm 打断 )
    
                    try:
                        event_pairs = self._impl.poll(poll_timeout) # 获取返回的活跃事件队
                    except Exception as e:
                        if errno_from_exception(e) == errno.EINTR:
                            continue
                        else:
                            raise
    
                    if self._blocking_signal_threshold is not None:
                        signal.setitimer(signal.ITIMER_REAL,
                                         self._blocking_signal_threshold, 0) #  epoll_wait 结束, 再设置 signal alarm
                    self._events.update(event_pairs) # 将活跃事件加入 _events
                    while self._events:
                        fd, events = self._events.popitem() # 循环弹出事件
                        try:
                            fd_obj, handler_func = self._handlers[fd] # 处理事件
                            handler_func(fd_obj, events)
                        except (OSError, IOError) as e:
                            if errno_from_exception(e) == errno.EPIPE:
                                pass
                            else:
    						    self.handle_callback_exception(self._handlers.get(fd))
                        except Exception:
                            self.handle_callback_exception(self._handlers.get(fd))
                    fd_obj = handler_func = None
    
            finally:
                self._stopped = False # 确保发生异常也继续运行
                if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL, 0, 0) # 清空 signal alarm
                IOLoop._current.instance = old_current 
                if old_wakeup_fd is not None:
                    signal.set_wakeup_fd(old_wakeup_fd)   # 和 start 开头部分对应,但是不是很清楚作用,求老司机带带路
    

    最后来看 stop:

    stop

    def stop(self):
    	self._running = False
    	self._stopped = True
    	self._waker.wake()
    

    这个很简单,设置判断条件,然后调用 self._waker.wake() 向 pipe 写入随意字符释放 pipe 。 over !

    总结

    噗,写了这么长,终于写完了。 经过分析,我们可以看到, ioloop 实际上是对 epoll 的封装,并加入了一些对上层事件的处理和 server 相关的底层处理。

    最后,感谢大家不辞辛苦看到这,文中理解有误的地方还请多多指教!:pray:

    原文地址

    作者:rapospectre

    2 条回复    2016-06-08 09:27:03 +08:00
    micyng
        1
    micyng  
       2016-06-07 20:49:34 +08:00
    最后一点写错了,不是释放 pipe ,而是利用 pipe 的 fd 唤醒 ioloop 事件循环
    rapospectre
        2
    rapospectre  
    OP
       2016-06-08 09:27:03 +08:00
    @micyng 谢谢指正!已经修改
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2200 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 05:31 · PVG 13:31 · LAX 21:31 · JFK 00:31
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.