Microsoft MVP성태의 닷넷 이야기
글쓴 사람
정성태 (techsharer at outlook.com)
홈페이지
첨부 파일
 

(시리즈 글이 3개 있습니다.)
스크립트: 58. 파이썬 - async/await 기본 사용법
; https://www.sysnet.pe.kr/2/0/13423

스크립트: 59. 파이썬 - 비동기 호출 함수(run_until_complete, run_in_executor, create_task, run_in_threadpool)
; https://www.sysnet.pe.kr/2/0/13426

스크립트: 71. 파이썬 - asyncio의 ContextVar 전달
; https://www.sysnet.pe.kr/2/0/13899




파이썬 - 비동기 호출 함수(run_until_complete, run_in_executor, create_task, run_in_threadpool)

지난 글에 이어,

파이썬 - async/await 기본 사용법
; https://www.sysnet.pe.kr/2/0/13423

이번에는 다양한 비동기 호출 함수에 대해 알아보겠습니다. 우선, 비동기를 수행하는 asyncio의 run 함수 먼저 볼까요? ^^

import asyncio


async def main_async():
    time.sleep(3)


asyncio.run(main_async())

run은 async 함수를 전달받아 그것의 실행이 종료될 때까지 대기하는 역할을 합니다. 사실 이 코드는 다음과 같이 바꿔 쓸 수 있습니다. (run의 실제 내부 구현과는 다를 수 있습니다.)

import asyncio


async def main_async():
    time.sleep(3)


loop = asyncio.get_event_loop()
loop.run_until_complete(main_async())  # 의미상, "await main_async()"

만약 await 예약어 호출을 async 함수 내에서만 허용한다는 규칙이 없었다면 아마도 asyncio.run 함수는 그냥 단순히 await 예약어로 대체되었을지도 모릅니다.




파이썬의 경우, 명확하게 await 호출의 대상 함수가 async이기를 요구하는데요, 그렇다면 기존에 제작된 동기 함수들을 비동기로 호출하고 싶다면 어떻게 해야 할까요?

이런 경우에 대해 asyncio는 run_in_executor 함수를 제공합니다.

import time
import asyncio


def do_sync(delay: int=0):
    time.sleep(delay)


async def main_async():
    print('async calling', time.strftime('%X'))

    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, do_sync, 3)

    print('async called', time.strftime('%X'))


asyncio.run(main_async())

""" 출력 결과
async calling 16:02:46
async called 16:02:49
"""

run_in_executor는 2번째 인자로 받은 (동기) 함수를 내부의 이벤트 루프에 태워 비동기로 실행합니다. 따라서, run_in_executor의 호출에는 await을 해야 do_sync가 완료된 후 그다음의 코드를 이어서 호출하게 됩니다.

만약, 위의 예제에서 "await loop.run_in_executor(...)" 코드를 await 없이 호출하게 되면 실행 후 종료 시점에 아래와 같은 오류가 발생합니다.

exception calling callback for 
Traceback (most recent call last):
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 328, in _invoke_callbacks
    callback(self)
  File "/usr/lib/python3.8/asyncio/futures.py", line 374, in _call_set_state
    dest_loop.call_soon_threadsafe(_set_state, destination, source)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 764, in call_soon_threadsafe
    self._check_closed()
  File "/usr/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

왜냐하면, await이 없으므로 호출 대기(설명은 '대기'라고 했지만 스레드 블록킹은 아닙니다.)를 하지 않을 것이고, 이로 인해 그대로 프로그램이 종료하면서 아직 진행 중인 callback 함수(위의 경우, do_sync)가 강제 종료되므로 예외를 통해 그 사실을 알려주는 것입니다.




자... 그럼 run_in_executor의 병렬 처리도 이전에 쓴 글과 동일한 규칙으로 코딩할 수 있습니다. 우선, 다음의 코드는 차례로 6초가 걸리지만,

await loop.run_in_executor(None, do_sync, 3)  # 3초 지나서 이후의 코드로 진행 (다시 강조하지만, 호출 측의 스레드 블록킹이 되는 것이 아닙니다.)
await loop.run_in_executor(None, do_sync, 3)  # 3초 지나서 이후의 코드로 진행
                                              # 총 6초 소요

이렇게 하면 3초 만에 결과가 나옵니다.

task1 = loop.run_in_executor(None, do_sync, 3)  # 비동기로 do_sync 함수를 전달하고 바로 제어 반환
task2 = loop.run_in_executor(None, do_sync, 3)  # 비동기로 do_sync 함수를 전달하고 바로 제어 반환

await task1  # do_sync 작업 완료 대기 (3초) 후 진행
await task2  # 위의 코드에서 3초 대기를 이미 했으므로 곧바로 제어 반환




그다음, asyncio의 create_task 함수를 볼까요?

# 직렬로 비동기 함수 호출

async def main_async():
    await asyncio.create_task(do_async(3))
    await asyncio.create_task(do_async(3))

# 병렬로 비동기 함수 호출

async def main_async():
    task1 = asyncio.create_task(do_async(3))
    task2 = asyncio.create_task(do_async(3))

    await task1
    await task2

사실 저 코드는 asyncio.create_task 호출 자체를 없애고 await으로 대체해도 동일한 동작을 합니다.

async def main_async():
    await do_async(3)
    await do_async(3)

async def main_async():
    task1 = do_async(3)
    task2 = do_async(3)

    await task1
    await task2

단지, create_task의 경우 부가적으로 name 인자와 함께 문맥 정보를 전달할 수 있는 정도의 서비스가 추가된 정도입니다. (문맥은 설명이 복잡하니 다른 글에서 한번 다루겠습니다.)




이 정도면, 비동기 호출과 관련한 예를 웬만큼 다룬 것 같은데요, 마지막으로 하나 더 남은 것이 있다면 FastAPI에서 제공하는 run_in_threadpool입니다.

from fastapi.concurrency import run_in_threadpool


def myfunc(arg_values):
    delay_number = arg_values['delay']
    time.sleep(delay_number)


@app.get("/asynctest", response_class=HTMLResponse)
async def asynctest(delay: int = 0):
    delay_arg = {'delay': 1}
    await run_in_threadpool(myfunc, delay_arg)  # 1초 후 아래 코드로 진행
    await run_in_threadpool(myfunc, delay_arg)  # 1초 후 아래 코드로 진행
    await run_in_threadpool(myfunc, delay_arg)  # 1초 점유 (총 3초 소요)

    return "TEST"

run_in_threadpool의 이름처럼, run_in_executor와 같이 동기 함수를 비동기에 태워서 실행할 수 있는데, 하지만 동작 자체는 매우 다릅니다. 왜냐하면, run_in_threadpool에 전달한 시점에 myfunc가 호출되지 않고, await을 한 시점에 실질적인 코드 실행 단계로 넘어가기 때문입니다.

따라서, run_in_threadpool의 경우에는 await만으로는 "병렬 처리"가 되지 않습니다.

task1 = run_in_threadpool(myfunc, delay_arg)  # 작업만 전달 (실행은 안 됨)
task2 = run_in_threadpool(myfunc, delay_arg)  # 작업만 전달 (실행은 안 됨)
task3 = run_in_threadpool(myfunc, delay_arg)  # 작업만 전달 (실행은 안 됨)

await task1  # 이 단계에서 myfunc 작업을 비동기로 실행
await task2  # task1이 완료된 후 task2가 이어서 비동기로 실행
await task3  # task2가 완료된 후 task3이 이어서 비동기로 실행

대신 run_in_threadpool의 작업을 병렬 처리하고 싶다면 asyncio.gather 등의 함수를 이용할 수 있습니다.

await asyncio.gather(task1, task2, task3)  # task1, task2, task3 작업이 모두 한꺼번에 비동기로 실행

한 가지 유의해야 할 점은, run_in_threadpool의 경우 비동기 함수를 인자로 받아도 실행 시 오류가 발생하지 않는다는 점입니다.

@app.get("/asynctest", response_class=HTMLResponse)
async def asynctest(delay: int = 0):
    task1 = run_in_threadpool(my_aio_func)  # 비동기 함수를 인자로 전달
    task2 = run_in_threadpool(my_aio_func)
    task3 = run_in_threadpool(my_aio_func)

    await asyncio.gather(task1, task2, task3)


async def my_aio_func(args=None):
    print('my_aio_func: start:', args)
    await asyncio.sleep(1)
    return "my value(async)"

실제로 위의 코드를 실행해 보면 아무런 오류가 발생하지 않는데, 여기서 문제는 my_aio_func이 아예 실행이 안 된다는 점입니다. (실행해 보시면, 화면 출력에 "my-aio_func: start"라는 문자열이 안 나옵니다.)

하지만, FastAPI 앱을 종료해 보면 그제야 다음과 같은 식으로 오류 메시지를 출력합니다.

/usr/lib/python3.8/threading.py:932: RuntimeWarning: coroutine 'my_aio_func' was never awaited
  self.run()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

이 정도면... 대충 파악이 되셨을 거라 생각합니다. ^^




[이 글에 대해서 여러분들과 의견을 공유하고 싶습니다. 틀리거나 미흡한 부분 또는 의문 사항이 있으시면 언제든 댓글 남겨주십시오.]







[최초 등록일: ]
[최종 수정일: 3/7/2025]

Creative Commons License
이 저작물은 크리에이티브 커먼즈 코리아 저작자표시-비영리-변경금지 2.0 대한민국 라이센스에 따라 이용하실 수 있습니다.
by SeongTae Jeong, mailto:techsharer at outlook.com

비밀번호

댓글 작성자
 




... 76  77  78  79  80  81  82  83  84  85  86  87  [88]  89  90  ...
NoWriterDateCnt.TitleFile(s)
11736정성태10/12/201818387오류 유형: 492. Visual Studio 로딩 시 오류 - The 'Scc Display Information' package did not load correctly.
11735정성태10/12/201824147VS.NET IDE: 129. Visual Studio - 특정 문자(열)를 개행 문자로 바꾸는 방법
11734정성태10/10/201818525Linux: 4. Synology NAS(DS216+II)에 FTDI 장치 연결 후 C#(.NET Core)으로 DTR 제어파일 다운로드1
11733정성태10/10/201821263Linux: 3. Synology NAS(DS216+II)에서 FTDI 장치를 C/C++로 제어
11732정성태10/10/201820982디버깅 기술: 119. windbg 분석 사례 - 종료자(Finalizer)에서 예외가 발생한 경우 비정상 종료(Crash) 발생파일 다운로드1
11731정성태10/9/201820516개발 환경 구성: 409. C# - REST API를 이용해 Azure Kudu 서비스 이용 - 웹 앱 확장 처리파일 다운로드1
11730정성태10/9/201819619개발 환경 구성: 408. C# - REST API를 이용해 Azure Kudu 서비스 이용 - 파일 처리파일 다운로드1
11729정성태10/9/201822232Windows: 150. 윈도우에서 ARP Cache 목록 확인 및 삭제하는 방법
11728정성태10/9/201820023사물인터넷: 50. Audio Jack 커넥터의 IR 적외선 송신기 [1]
11727정성태10/8/201821277오류 유형: 491. Visual Studio의 리눅스 SSH 원격 연결 - "Connectivity Failure. Please make sure host name and port number are correct."
11726정성태10/7/201823943사물인터넷: 49. 라즈베리 파이를 이용해 원격 컴퓨터의 전원 스위치 제어파일 다운로드1
11724정성태10/5/201823610개발 환경 구성: 407. 유니코드와 한글 - "Hangul Compatibility Jamo"파일 다운로드1
11723정성태10/4/201817506개발 환경 구성: 406. "Docker for Windows" 컨테이너 내의 .NET Core 응용 프로그램에서 직렬 포트(Serial Port, COM Port) 사용 방법
11722정성태10/4/201821188.NET Framework: 798. C# - Hyper-V 가상 머신의 직렬 포트와 연결된 Named Pipe 간의 통신파일 다운로드1
11721정성태10/4/201821487.NET Framework: 797. Linux 환경의 .NET Core 응용 프로그램에서 직렬 포트(Serial Port, COM Port) 사용 방법파일 다운로드1
11720정성태10/4/201823044개발 환경 구성: 405. Hyper-V 가상 머신에서 직렬 포트(Serial Port, COM Port) 사용
11719정성태10/4/201823660.NET Framework: 796. C# - 인증서를 윈도우에 설치하는 방법
11718정성태10/4/201818850개발 환경 구성: 404. (opkg가 설치된) Synology NAS(DS216+II)에 cmake 설치
11717정성태10/3/201821445사물인터넷: 48. 넷두이노의 C# 네트워크 프로그램 [1]
11716정성태10/3/201821995사물인터넷: 47. Raspberry PI Zero (W)에 FTDI 장치 연결 후 C/C++로 DTR 제어파일 다운로드1
11715정성태10/3/201820788사물인터넷: 46. Raspberry PI Zero (W)에 docker 설치
11714정성태10/2/201820026사물인터넷: 45. Raspberry PI에 ping을 hostname으로 하는 방법
11713정성태10/2/201822446개발 환경 구성: 403. Synology NAS(DS216+II)에 docker 설치 후 .NET Core 2.1 응용 프로그램 실행하는 방법
11712정성태10/2/201827629.NET Framework: 795. C# - 폰트 목록을 한글이 아닌 영문으로 구하는 방법 [3]
11711정성태10/2/201823087오류 유형: 490. 윈도우 라이선스 키 입력 오류 0xc004f050, 0xc004e028
11710정성태10/2/201822041.NET Framework: 794. C# - 같은 모양, 다른 값의 한글 자음을 비교하는 호환 분해 [5]
... 76  77  78  79  80  81  82  83  84  85  86  87  [88]  89  90  ...