今天在 debug Ray 的某個 issue 的時候,因為沒注意到 Python 的 asyncio.run_coroutine_threadsafe 的特殊行為,官方 doc 沒有特別說明,要去看 CPython source code 才知道,導致 debug 了有點久的時間,特此紀錄。

根據 官方文檔 ,這個 function 是用來把一個 coroutine 跑在另一個 thread 上面的 event loop 裡面,然後他會 return 一個 Future object。

根據文檔我們可以快速的寫出一個 toy example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
import threading

async def f():
    print("Inside coroutine f()")
    raise ValueError

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def main():
    loop = asyncio.new_event_loop()
    thread = threading.Thread(target=start_loop, args=(loop,))
    thread.start()

    future = asyncio.run_coroutine_threadsafe(f(), loop)

    try:
        future.result()
    except Exception as e:
        print("Caught exception:", repr(e))

    print("Stopping loop")
    loop.call_soon_threadsafe(loop.stop)
    thread.join()

if __name__ == "__main__":
    main()

main function 創了一個 thread 和一個 event loop,然後把 coroutine fasyncio.run_coroutine_threadsafe 提交到該 event loop 上面,之後然後用 future.result() 拿結果,並 handle exceptions,最後把 event loop 停掉,看起來沒問題對吧。

執行結果如下:

Inside coroutine f()
Caught exception: ValueError()
Stopping loop

現在問題來了,如果我們在 coroutine f 裡面 raise 的不是 ValueError 而是 SystemExit 的話會怎麼樣?

根據 官方文檔SystemExit 是繼承 BaseException 而不是 Exception,所以我們把第 21 行的 Exception 換成 BaseException 就好了?你會發現你改完這兩個地方之後再執行一次程式它會在 print 完 Inside coroutine f() 之後 hang 住,永遠不會結束。

我們必須去看 asyncio.run_coroutine_threadsafe 的 source code 才知道到底發生了什麼事情,我把該程式碼貼在這邊:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def run_coroutine_threadsafe(coro, loop):
    """Submit a coroutine object to a given event loop.

    Return a concurrent.futures.Future to access the result.
    """
    if not coroutines.iscoroutine(coro):
        raise TypeError('A coroutine object is required')
    future = concurrent.futures.Future()

    def callback():
        try:
            futures._chain_future(ensure_future(coro, loop=loop), future)
        except (SystemExit, KeyboardInterrupt):
            raise
        except BaseException as exc:
            if future.set_running_or_notify_cancel():
                future.set_exception(exc)
            raise

    loop.call_soon_threadsafe(callback)
    return future

我們可以發現原來是這個 function 對於 SystemExitKeyboardInterrupt 這兩個 exception 有特殊處理,是直接 raise,對於其他的 exception 則是會 call future.set_exception,因為他沒有 call set_exception,所以 future.result() 永遠拿不到結果。

我們稍微改寫一下原程式:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import time
import asyncio
import threading

async def f():
    print("Inside coroutine f()")
    raise SystemExit

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def main():
    loop = asyncio.new_event_loop()
    thread = threading.Thread(target=start_loop, args=(loop,))
    thread.start()

    future = asyncio.run_coroutine_threadsafe(f(), loop)

    try:
        future.result(timeout=3)
    except TimeoutError:
        print("Timeout")
        print("Future done?", future.done())
        print("Future cancelled?", future.cancelled())
        print("Loop alive?", loop.is_running())
        print("Thread alive?", thread.is_alive())
    except BaseException as e:
        print("Caught exception:", repr(e))

    print("Stopping loop")
    loop.call_soon_threadsafe(loop.stop)
    thread.join()

if __name__ == "__main__":
    main()

執行結果:

Inside coroutine f()
Timeout
Future done? False
Future cancelled? False
Loop alive? False
Thread alive? False
Stopping loop
Task exception was never retrieved
future: <Task finished name='Task-1' coro=<f() done, defined at /home/mortalhappiness/test/test.py:5> exception=SystemExit()>
Traceback (most recent call last):
  File "/home/mortalhappiness/miniforge3/envs/test/lib/python3.12/threading.py", line 1075, in _bootstrap_inner
    self.run()
  File "/home/mortalhappiness/miniforge3/envs/test/lib/python3.12/threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "/home/mortalhappiness/test/test.py", line 11, in start_loop
    loop.run_forever()
  File "/home/mortalhappiness/miniforge3/envs/test/lib/python3.12/asyncio/base_events.py", line 641, in run_forever
    self._run_once()
  File "/home/mortalhappiness/miniforge3/envs/test/lib/python3.12/asyncio/base_events.py", line 1986, in _run_once
    handle._run()
  File "/home/mortalhappiness/miniforge3/envs/test/lib/python3.12/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/home/mortalhappiness/test/test.py", line 7, in f
    raise SystemExit
SystemExit

可以看到這個行為其實有點搞,因為 future.done()future.cancelled() 都是 False,而 call future.result()future.exception() 都會 hang 住不會有結果,但是你不 call 的話還會像上面一樣跳一個 warning 給你說 task exception was never retrieved,然後 event loop 和 thread 都還掛了。

結論

如果用 asyncio.run_coroutine_threadsafe 去執行 coroutine,而該 coroutine raise 了 SystemExit 或是 KeyboardInterrupt 的話,call future.result()future.exception() 是沒用的,會永遠 hang 住。另外 thread 和 event loop 都會死掉,但是 main thread 不會死。