파이썬- asyncio의 ContextVar 전달
asyncio는 현재 스레드의 ContextVar와 연동해 대상 함수를 실행하지만,
PEP 567 – Context Variables
; https://peps.python.org/pep-0567/
event loop를 통해 실행하는 경우에는 ContextVar를 처리해 주지 않습니다. 이에 대한 코드를 다음과 같이 테스트할 수 있는데요,
import asyncio
import contextvars
import os
print(f'[{os.getpid()}] in main.py')
ctx_var = contextvars.ContextVar('myctx', default='(null)')
async def total1(numbers):
print(f'[{os.getpid()}] total1 in create_task:', ctx_var.get(), sum(numbers))
def total2(numbers):
print(f'[{os.getpid()}] total2 in run_in_executor:', ctx_var.get(), sum(numbers))
async def main():
ctx_var.set('root-main')
task1 = asyncio.create_task(total1([1, 2]))
await asyncio.wait({task1})
default_loop = asyncio.get_event_loop()
await default_loop.run_in_executor(None, total2, [3, 4])
asyncio.run(main())
/* 실행 결과:
[48428] in main.py
[48428] total1 in create_task: root-main 3
[48428] total2 in run_in_executor: (null) 7
*/
보는 바와 같이
asyncio.create_task는 ContextVar를 전달하지만,
loop.run_in_executor인 경우에는 전달되지 않고 있습니다.
이와 관련해 아래의 이슈가 있는데요,
loop.run_in_executor should propagate current contextvars #78195
; https://github.com/python/cpython/issues/78195
create_task는 언제나 스레드를 통해 대상을 실행하지만, run_in_executor는 전달하는 Executor에 따라 별도의 [스레드 또는 프로세스]로 실행하기 때문에 ContextVar를 전달하지 못한다고 합니다.
닷넷의 경험과는 좀 다른데요,
직렬화에 따라 ContextVar를 전달하는 것을 나눠도 되기 때문입니다.
게다가, 이 설명이 좀 맞지 않는 것이, run_in_executor에 오히려
ProcessPoolExecutor를 전달하면,
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor()
await default_loop.run_in_executor(executor, total2, [3, 4])
/* 실행 결과:
[1051213] in main.py
[1051213] total1 in create_task: root-main 3
[1051214] total2 in run_in_executor: root-main 7
*/
ContextVar가 잘 전달됩니다. 즉,
ThreadPoolExecutor 내부에서 ContextVar에 대한 전파를 하지 않는 것입니다.
executor = ThreadPoolExecutor()
await default_loop.run_in_executor(executor, total2, [3, 4])
// 실행 결과: total2 in run_in_executor: (null) 7
이외에도 이상한 점이 하나 있다면, 저렇게 executor를 통해 ProcessPoolExecutor를 전달하는 것은 가능하지만, 정작
set_default_executor로 전달하는 것은 또 불가능하다는 점입니다.
executor = ProcessPoolExecutor()
default_loop = asyncio.get_event_loop()
default_loop.set_default_executor(executor)
/* 예외 발생: Python 3.11부터 예외가 발생하도록 변경
Traceback (most recent call last):
File "/home/testusr/testconsole/main.py", line 52, in
asyncio.run(main())
File "/usr/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/home/testusr/testconsole/main.py", line 49, in main
default_loop.set_default_executor(executor)
File "/usr/lib/python3.11/asyncio/base_events.py", line 834, in set_default_executor
raise TypeError('executor must be ThreadPoolExecutor instance')
TypeError: executor must be ThreadPoolExecutor instance
*/
암튼, ThreadPoolExecutor 자체가 문제이기 때문에 이를 보완하면 저 문제를 해결할 수 있습니다. 즉, 사용자 정의 ThreadPoolExecutor를 만들면 되는 것인데요,
# Python: Copy context (contextvars.Context) to a separate thread
# https://stackoverflow.com/questions/71511061/python-copy-context-contextvars-context-to-a-separate-thread
class ThreadPoolExecutorWithCopyContext(ThreadPoolExecutor):
def submit(self, fn, /, *args, **kwargs): # Positional-only parameters
ctx_vars = copy_context().items()
def _fn():
for var, value in ctx_vars:
var.set(value)
return fn(*args, **kwargs)
return super().submit(_fn)
그래서 저 인스턴스를 run_in_executor에 전달하거나,
executor = ThreadPoolExecutorWithCopyContext()
await default_loop.run_in_executor(executor, total2, [3, 4])
set_default_executor로 지정하면 됩니다.
executor = ThreadPoolExecutorWithCopyContext()
default_loop.set_default_executor(executor)
[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]