티스토리 뷰

728x90
반응형

안녕하세요. Teus입니다.

이번 포스팅은 asyncio의 저수준 api를 사용해서

어떻게 바닥부터 비동기 함수를 만드는지에 대해서 asyncio.sleep 조교와 함께 알아보겠습니다.

0. asyncio.sleep

python에서 가장 처음 asyncio를 접할 경우

아마도 십중 팔구 asyncio.sleep을 통해서 async를 쓰면 좋다고 전도받습니다.

#https://docs.python.org/3/library/asyncio-runner.html
async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

근데 이상하게도 아래처럼 만든 함수를 사용하면 async의 힘을 사용할 수가 없습니다

async def my_async_sleep(tm):
    time.sleep(tm)
    print("hello")

1. asyncio.sleep 소스코드 리딩(조금만...)

(소스코드에 관심없으시면, 2. 직접 async 함수 만들어보기로 내려가주세요!)
그래서, 요즘은 잘 안까보는데 소스코드를 살짝 까 보았습니다😅.

#https://github.com/python/cpython/blob/main/Lib/asyncio/tasks.py#L703
async def sleep(delay, result=None):
    """Coroutine that completes after a given time (in seconds)."""
    ...        

    loop = events.get_running_loop()
    future = loop.create_future()
    h = loop.call_later(delay,
                        futures._set_result_unless_cancelled,
                        future, result)
    try:
        return await future
    finally:
        h.cancel()

events.get_running_loop()를 통해서 Asyncio의 event_loop를 가지고 옵니다.

그리고 loop.create_future()를 통해서 callback의 결과를 가지고올 객체를 만들어 줍니다.

그리고 loop.call_later()를 볼 수 있습니다.
loop.call_later()

loop.call_later()의 경우 (delay, callback, *args, context)를 매개변수로 받습니다.

지금 그러면 future._set_result_unless_cancelled가 callback으로 들어간것을 확인할 수 있습니다.

그럼.. 좀더 깊이 들어가 보겠습니다.

#https://github.com/python/cpython/blob/main/Lib/asyncio/futures.py#L312
def _set_result_unless_cancelled(fut, result):
    """Helper setting the result only if the future was not cancelled."""
    if fut.cancelled():
        return
    fut.set_result(result)

해당 callback은 내부에서 future가 취소되지 않는다면, future의 값을 callback방식으로 설정해주는 함수에 불과합니다.

그렇기 때문에 loop.call_later를 좀더 살펴볼 필요가 있습니다.

#https://github.com/python/cpython/blob/main/Lib/asyncio/base_events.py#L777
class BaseEventLoop(events.AbstractEventLoop):
    ...
    def call_later(self, delay, callback, *args, context=None):
        ...
        timer = self.call_at(self.time() + delay, callback, *args,
                             context=context)
        ...
        return timer

    def call_at(self, when, callback, *args, context=None):
        ...        
        timer = events.TimerHandle(when, callback, args, self, context)
        ...
        heapq.heappush(self._scheduled, timer)
        timer._scheduled = True
        return timer

call_later는 정해진 delay를 이용해서 call_at으로 작업을 넘겨줍니다.

여기서 events.TimerHandle이라는 class가 있고 _scheduled라는 property가 있습니다.

#https://github.com/python/cpython/blob/main/Lib/asyncio/events.py#L107
class TimerHandle(Handle):
    """Object returned by timed callback registration methods."""

    __slots__ = ['_scheduled', '_when']

    def __init__(self, when, callback, args, loop, context=None):
        super().__init__(callback, args, loop, context)
        self._when = when
        self._scheduled = False

이를 통해서 정해진 작업(callback), 정해진 시간(when), 그리고 나머지 args를 가지고 heapq에 들어가면

loop내부에서 해당 heapq에 있는 작업을 뺐다, 넣었다 하면서 비동기로 작업을 처리하고

마지막에 future를 set해줌으로써 해당 작업을 마무리 해 주는 것을 예상해 볼 수 있습니다.

2. async print 함수 만들어 보기

이제 asyncio.sleep이 어떻게 동작하는지 알았으니, N초 뒤에 동작해야되는 async_print함수를 만들어 보겠습니다.

import asyncio

async def async_print(i):
    loop = asyncio.get_running_loop()
    fut = loop.create_future()
    ret = None
    def my_callback(fut, ret, idx):
        if fut.cancelled():
            return
        fut.set_result(ret)
        print(f"{idx} is now print")

    h = loop.call_later(0.01,
                        my_callback,
                        fut, ret, i)
    try:
        return await fut
    finally:
        h.cancel()

async def sync_print(i):
    print(f"{i} is now print")
    return 

async def main():
    await asyncio.gather(*[async_print(i) for i in range(5)])
    print("*"*100)
    await asyncio.gather(*[sync_print(i) for i in range(5)])


if __name__ == "__main__":
    asyncio.run(main())

image.png

그럼 보시는것 처럼 일반적으로 print를 사용한 async def와 달리, 실행 순서와 무관하게 작업이 비동기로 처리되는 모습을 확인할 수 있습니다.

이 방법을 활용하면, 비동기로 file을 open하는 나만의 async_open같은것도 역시 만들 수가 있습니다.

import asyncio

async def async_open(i, file_nm):
    loop = asyncio.get_running_loop()
    fut1 = loop.create_future()
    fut2 = loop.create_future()
    fut3 = loop.create_future()
    def my_callback1(fut, file_nm, idx):
        fd = open(file_nm, "r")
        if fut.cancelled():return
        fut.set_result(fd)
        print(f"{idx}th task file open is ready")

    def my_callback2(fut, fut2, idx):        
        if fut.cancelled():return
        print(f"{idx}th task file read is read")
        fut2.set_result(fut1.result().read())
        print(f"{idx}th task file read complete")

    def my_callback3(fut, fd, idx):             
        if fut.cancelled():return
        fd.close()
        print(f"{idx}th task file stream close")
        fut.set_result("Done")


    h = loop.call_later(0,
                        my_callback1,
                        fut1, file_nm, i)
    await fut1
    h.cancel()


    h = loop.call_later(0,
                        my_callback2,
                        fut1, fut2, i)
    await fut2
    h.cancel()        

    h = loop.call_later(0,
                        my_callback3,
                        fut3, fut1.result(), i)    
    await fut3
    try:
        return await fut2
    finally:
        h.cancel()

async def main():
    await asyncio.gather(*[async_open(i, "hello.csv") for i in range(5)])    


if __name__ == "__main__":
    asyncio.run(main())

image.png

근데 실제 결과를 보면, 비동기 보다는 javascript의 setTimeout과 거의 유사하게 동작하는것을 확인할 수 있습니다.

728x90
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31
글 보관함