Skip to content

Asyncio源码解读01-Runner解析

约 2632 字大约 9 分钟

PythonAsyncio

2025-09-13

提到 Python 协程,我们一般会想到 Asyncio ,它是 Python 标准库实现的协程框架,代表了 Python 协程事实上的标准实现。

本系列文章旨在由浅至深地分析 Asyncio 的所有模块,剖析其实现原理。

本系列分析的cpython源码位于main分支,提交时间为 Fri Sep 12 22:25:18 2025 +0100,提交ID为805e3368d6d07e58430654d1365283924fdf4143

为了简化代码分析,笔者会删除一部分debug代码。

协程和协程框架

在开始正式的代码解析之前,我们要先确立一个共识:协程并不能等同于协程框架。协程是一种高级函数,它表示一个能随时从某个断点跳出,并且能随时恢复到该断点继续执行的函数;而协程框架(或者说异步框架)是利用协程实现的异步开发框架,其包含了对上层开发需求的封装(例如IO、延时等)。

根据协程实现的方式不同,我们可以将其分类为有栈无栈两种,有栈协程从操作系统的线程和进程切换获得灵感,通过保存和切换上下文实现断点的跳出和恢复;无栈协程则不同,它通过一个状态机维护当前函数的执行点,并用外部结构保存函数的所有状态(将其生命周期提升到函数生命周期之外),通过状态机选择每次进入函数的执行位置,以达成断点的跳出和恢复。

根据上面的描述,我们不难看出有栈协程和无栈协程的特点:

  • 有栈协程:便于实现,但在断点必须确定下一个进入的协程,因此协程调度器需要以侵入式的方式传递给协程,并且每个协程都需要维护其栈空间,可能造成内存浪费。
  • 无栈协程:实现困难(通常需要语言本身对其进行支持),但只需要维护状态机和栈空间,协程本身不与上层调度混杂,只需要固定的内存空间。

实际上,也有方法优化有栈协程的内存浪费,我们称为共享栈,但实际上无非是时间效率和空间效率的二选一。

编程语言一般更喜欢对无栈协程进行支持,再由第三方或者语言自身的标准库提供运行时支持,这里比较有名的有 Python 的 Asyncio 、Rust 的 Tokio 和 C++20 协程等。

Asyncio

Asyncio 将协程封装为Task,使用事件循环机制调度任务直到所有的任务完成,而Runner是 Asyncio 暴露给外部的任务执行方法,其管理事件循环的生命周期。

Runner类存储于runners.py中,是该库中较为简单的部分。

Asyncio Runner

当我们调用asyncio.run(coro)的时候,思考一下,Python 会在这个时候做什么?答案便位于runners.py中:

def run(main, *, debug=None, loop_factory=None):
    if events._get_running_loop() is not None:
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")

    with Runner(debug=debug, loop_factory=loop_factory) as runner:
        return runner.run(main)

该函数分为两个部分:检查事件循环和启动Runner

asyncio.run的目的是在一个没有任务的上下文中初始化一个任务,并等待它运行完成,因此,在进入Runner之前,我们必须保证事件循环没有在运行(即将要运行的任务是唯一可运行的任务)。

Runner支持了with语句,保证了Runner在构造和析构时执行初始化和清理操作(但实际上更重要的是清理行为,详见后文)。

Runner 初始化

Runnerwith表达式会调用_lazy_init_lazy_init是一种初始化保证,它保证Runner在调用该函数后,一定处于就绪状态:

    def __enter__(self):
        self._lazy_init()
        return self

    def _lazy_init(self):
        if self._state is _State.CLOSED:
            raise RuntimeError("Runner is closed")
        if self._state is _State.INITIALIZED:
            return
        if self._loop_factory is None:
            self._loop = events.new_event_loop()
            if not self._set_event_loop:
                events.set_event_loop(self._loop)
                self._set_event_loop = True
        else:
            self._loop = self._loop_factory()
        self._context = contextvars.copy_context()
        self._state = _State.INITIALIZED

在解读之前,我们必须说明的是,running_loopevent_loop并不等同,他们是两套体系,一套存储于EventLoopPolicy中,一套存储于thread_local_running_loop中。

_lazy_init中,首先检查状态机:

        if self._state is _State.CLOSED:        # CLOSE 不应当再次初始化
            raise RuntimeError("Runner is closed")
        if self._state is _State.INITIALIZED:   # 已经初始化,直接退出
            return

接着,函数会检查Runner初始化时有没有传入事件循环工厂_loop_factory,如果有,直接使用工厂获取事件循环,否则使用默认实现:

            self._loop = events.new_event_loop()
            if not self._set_event_loop:            # 只设置一次事件循环
                events.set_event_loop(self._loop)
                self._set_event_loop = True

只有第一次初始化时,我们才通过set_event_loop设置全局事件循环,这是为了防止在子观察器上多次调用attach_loop,具体细节我们会在分析事件循环时进行解释。

最后,Runner拷贝了当前环境的上下文,作为新协程的默认上下文:

        self._context = contextvars.copy_context()

contextvars是 Python 提供的上下文库,它提供了比thread_local更封闭的task_local隔离。

其实这里,笔者觉得有些不大合理,如果我们构建如下案例:

import asyncio
import contextvars

somevalue = contextvars.ContextVar("somevalue")
somevalue.set("ORIGINAL")

async def foo():
    print(somevalue.get())

with asyncio.Runner() as runner:
    somevalue.set("EDITED")
    runner.run(foo())

该案例最终的运行结果是ORIGINAL,也就是说,在我们调用run方法时,协程内的初始上下文并不严格等于调用者的上下文。但考虑到我们几乎不会自己调用Runner.run,因此也无伤大雅。

Runner 运行

实际上,run方法内部已经包含了对running_loop的检查和_lazy_init,这就是为什么笔者说with的作用更多是确保清理行为,移除重复部分后,run方法如下:

    def run(self, coro, *, context=None):
        if not coroutines.iscoroutine(coro):
            if inspect.isawaitable(coro):
                async def _wrap_awaitable(awaitable):
                    return await awaitable

                coro = _wrap_awaitable(coro)
            else:
                raise TypeError('An asyncio.Future, a coroutine or an '
                                'awaitable is required')

        if context is None:
            context = self._context

        task = self._loop.create_task(coro, context=context)

        if (threading.current_thread() is threading.main_thread()
            and signal.getsignal(signal.SIGINT) is signal.default_int_handler
        ):
            sigint_handler = functools.partial(self._on_sigint, main_task=task)
            try:
                signal.signal(signal.SIGINT, sigint_handler)
            except ValueError:
                sigint_handler = None
        else:
            sigint_handler = None

        self._interrupt_count = 0
        try:
            return self._loop.run_until_complete(task)
        except exceptions.CancelledError:
            if self._interrupt_count > 0:
                uncancel = getattr(task, "uncancel", None)
                if uncancel is not None and uncancel() == 0:
                    raise KeyboardInterrupt()
            raise
        finally:
            if (sigint_handler is not None
                and signal.getsignal(signal.SIGINT) is sigint_handler
            ):
                signal.signal(signal.SIGINT, signal.default_int_handler)

看起来略显复杂,我们可以一步步拆解分析,首先,该函数检查入参是否为协程:

        if not coroutines.iscoroutine(coro):
            if inspect.isawaitable(coro):   # 不是协程,但可以等待,进行一次封装
                async def _wrap_awaitable(awaitable):
                    return await awaitable

                coro = _wrap_awaitable(coro)
            else:
                raise TypeError('An asyncio.Future, a coroutine or an '
                                'awaitable is required')

如代码所示,如果入参不是协程而是可等待类型,就用一层包装将其转换为协程,否则抛出异常。

iscoroutine表示标记为协程的函数的返回值,isawaitable比较宽松,它只要求类型实现了__await__

类型检查后,我们根据入参获取协程的上下文,如果没有指定,则使用我们刚刚复制好的上下文,然后,让事件循环创建任务:

        task = self._loop.create_task(coro, context=context)

为了正确处理SIGINT,需要拦截SIGINT,但SIGINT回调只注册一次,并且必须在主线程中进行(保证信号处理的唯一性):

        # 注册 SIGINT 回调
        if (threading.current_thread() is threading.main_thread()
            and signal.getsignal(signal.SIGINT) is signal.default_int_handler
        ):
            sigint_handler = functools.partial(self._on_sigint, main_task=task)
            try:
                signal.signal(signal.SIGINT, sigint_handler)
            except ValueError:
                sigint_handler = None
        else:
            sigint_handler = None

        # 初始化中断次数
        self._interrupt_count = 0

注册完成后,初始化_interrupt_count,其代表了程序被中断的次数,在_on_sigint中被使用:

    def _on_sigint(self, signum, frame, main_task):
        self._interrupt_count += 1
        if self._interrupt_count == 1 and not main_task.done(): # 第一次中断,取消
            main_task.cancel()
            self._loop.call_soon_threadsafe(lambda: None)
            return
        raise KeyboardInterrupt()                               # 后续中断,直接结束程序

如果第一次被中断,则取消main_task,并向事件循环推入一个空回调函数防止长时间停滞在select等调用上,这段操作实际上并不保证任务一定被取消。

如果第二次被中断,则直接触发KeyboardInterrupt,直接结束程序。

做好完全准备后,执行任务,直到任务完成:

        try:
            return self._loop.run_until_complete(task)  # 等待任务完成
        except exceptions.CancelledError:               # 处理取消
            if self._interrupt_count > 0:
                uncancel = getattr(task, "uncancel", None)
                if uncancel is not None and uncancel() == 0:
                    raise KeyboardInterrupt()
            raise
        finally:
            # 移除 SIGINT 回调
            if (sigint_handler is not None
                and signal.getsignal(signal.SIGINT) is sigint_handler
            ):
                signal.signal(signal.SIGINT, signal.default_int_handler)

如果发生了取消异常,分为两种情况:

  • 该操作是人为的(_interrupt_count > 0),并且任务只被取消了一次(uncancelcancel成对出现,表示减少取消计数和增加取消计数),是正常的取消操作,将其转换为KeyboardInterrupt
  • 不符合以上条件,是非正常的取消,直接抛出取消异常。

最后,我们移除SIGINT的回调(本次运行结束,不需要继续处理中断)。

Runner 清理

Runner自己的生命周期末期,需要调用close来清理事件循环:

    def close(self):
        if self._state is not _State.INITIALIZED:
            return
        try:
            loop = self._loop

            # 取消所有任务
            _cancel_all_tasks(loop)

            # 关闭所有异步生成器
            loop.run_until_complete(loop.shutdown_asyncgens())

            # 关闭默认 Executor
            loop.run_until_complete(
                loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT))
        finally:
            if self._set_event_loop:
                events.set_event_loop(None)
            loop.close()
            self._loop = None
            self._state = _State.CLOSED

取消操作总共分为三步:

  • 取消所有任务
  • 关闭所有异步生成器
  • 关闭默认executor

在将事件循环内部清理干净之后,我们移除全局时间循环,并将其关闭。

_cancel_all_tasks的逻辑为:

def _cancel_all_tasks(loop):
    to_cancel = tasks.all_tasks(loop)
    if not to_cancel:
        return

    for task in to_cancel:
        task.cancel()

    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))

    for task in to_cancel:
        if task.cancelled():
            continue
        if task.exception() is not None:
            loop.call_exception_handler({
                'message': 'unhandled exception during asyncio.run() shutdown',
                'exception': task.exception(),
                'task': task,
            })

该函数对事件循环中所有的任务调用取消,并等待他们退出(不抛出异常):

    for task in to_cancel:  # 循环标记取消所有任务
        task.cancel()

    # 等待所有任务都退出
    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))

然后,一个个遍历取消的任务,如果有非取消异常,我们需要特殊处理(交给事件循环):

    for task in to_cancel:
        if task.cancelled():                # 取消成功,下一个
            continue
        if task.exception() is not None:    # 有异常,交给事件循环
            loop.call_exception_handler({
                'message': 'unhandled exception during asyncio.run() shutdown',
                'exception': task.exception(),
                'task': task,
            })

总结

Runner是距离我们用户最近的 Asyncio 接口,它的目的是管理事件循环的生命周期并帮助我们执行主任务,其中的设计无疑是值得称赞的——作为暴露给外部的接口,无论用户如何使用Runner,它都尽最大努力保证操作正常完成。

之后的章节会从Runner延伸,解析 Asyncio 的核心模块。

更新日志

2025/9/14 14:44
查看所有更新日志
  • b7ff2-move to Python
  • b9caa-asyncio runners.py