티스토리 뷰

728x90
반응형

안녕하세요.

대 AI시대에 python한 우물만을 파고있는 Teus입니다

이번 포스팅은, Python의 비동기 라이브러리인 aiofiles를 활용해서

비동기로 Parquet파일을 읽어들이는 방법에 대해서 다룹니다.

aiofiles 패키지를 이용해서 Bytes를 비동기로 읽어오고, 해당 Bytes를 활용해서 Parquet파일을 만들어 볼 예정입니다.

0. Parquet파일?

image.png
Parquet파일은 Data Engineering 분야에서 효율적으로 빅데이터를 저장하기 위해서 만들어진 확장자 포맷 입니다.

기존에 CSV를 대체할 목적으로 나왔고, 이전에 홍반장님의 포스팅에서도 다룬적이 있습니다.
Data Projects 에서 I/O Optimization by 홍반장

parquet 파일은 기본적으로 apache에서 제공하는 pyarrow를 사용해서 읽어들입니다
(여러분들이 사용하는 pandas.read_parquet 역시 위 패키지를 기본 엔진으로 사용합니다)

문제는 해당 Libaray는 Python의 비동기 지원을 하지 않습니다.

때문에 python의 FastAPI나 Django의 async view를 사용하는데 제약사항이 됩니다.

1. aiofiles

python의 라이브러리들은 기본적으로 sync로 동작되게 되어 있습니다.

때문에 함수를 async def로 구성한다고 해도 내부에서 blocking io를 발생시킵니다.

import time
import asyncio


async def mysleep(tm):
    time.sleep(tm)

async def main():
    st_time = time.perf_counter()
    await asyncio.gather(*[
        mysleep(3) for i in range(10)
    ])
    print(f"task done with {time.perf_counter() - st_time}")


if __name__ == "__main__":
    asyncio.run(main())

image.png

이때 time.sleep(sync) -> asyncio.sleep(async)로 바꾸면 원하는 대로 동작하는것을 볼 수 있습니다.

async def mysleep(tm):
    await asyncio.sleep(tm)

image.png

python의 open패키지 역시 기본적으로 sync로 동작하게 되어 있습니다.

aiofiles패키지는 open패키지의 비동기 버전이라고 볼 수 있습니다.

#https://pypi.org/project/aiofiles/
async with aiofiles.open('filename', mode='r') as f:
    contents = await f.read()
print(contents)
'My file contents'

2. pyarrow

pyarrow는 기본적으로 비동기로 동작 합니다.

때문에, python의 async를 사용해도 parquet파일을 읽을 때 사용하는 pyarrow.parquet.read_table() 의 큰 이점은 얻어가기는 어렵습니다.

하지만 pa(=pyarrow.parqet)의 경우, pa.BufferReader라는 방법을 활용, 해당 Bytes를 활용해서 Parquet File로 읽는것이 가능합니다.

#parquet 파일 생성
import pandas as pd
import numpy as np
df = pd.DataFrame({
        "a" : np.random.randint(1, 100, 10000000),
        "b" : np.random.randint(1, 100, 10000000),
        "c" : np.random.randint(1, 100, 10000000),
        "d" : np.random.randint(1, 100, 10000000),
        "e" : np.random.randint(1, 100, 10000000)
    })

df.to_parquet('df.parquet') 

#parquet파일 읽기
import pyarrow as pa
import pyarrow.parquet as pq
with open("df.parquet", "rb+") as fd:
    reader = pa.BufferReader(fd.read())
    table = pq.read_table(reader)
    new_df = table.to_pandas()

print(df == new_df)

3. aiofiles와 pyarrow를 통합하기

아까 제가 python의 open -> aiofiles.open로 대체하여 비동기로 동작하게 만들 수 있다고 했습니다.

이 점을 활용하면, parquer파일을 읽어오는는 i/o작업을 비동기로 실행시키는 것이 가능해 집니다.

import asyncio
import aiofiles
import pyarrow as pa
import pyarrow.parquet as pq

def sync_read():
    with open("df.parquet", "rb+") as fd:
        reader = pa.BufferReader(fd.read())
        table = pq.read_table(reader)
        new_df = table.to_pandas()

async def async_read():
    async with aiofiles.open("df.parquet", mode = "rb") as fd:
        reader = pa.BufferReader(await fd.read())
        table = pq.read_table(reader)
        new_df = table.to_pandas()

이제 비동기로 다수의 Data를 읽어오는게 얼마나 효율적인지 확인해볼 시간 입니다.

import asyncio
import aiofiles
import pyarrow as pa
import pyarrow.parquet as pq
import time


async def read_async(file_nm):
    async with aiofiles.open(file_nm, mode = "rb") as fd:
        return pq.read_table(pa.BufferReader(await fd.read())).to_pandas()

def read_sync(file_nm):
    with open(file_nm, mode = "rb+") as fd:
        return pq.read_table(pa.BufferReader(fd.read())).to_pandas()


async def main():
    time_save = {"async" : [], "sync_opt" : [], "sync_normal" : []}
    iter_count = 20
    file_count = 5
    for _ in range(iter_count):
        st = time.time()
        ret = await asyncio.gather(*[
            read_async("df.parquet") for i in range(file_count)
        ])
        time_save["async"].append(time.time() - st)

        st = time.time()
        ret = [read_sync("df.parquet") for i in range(file_count)]
        time_save["sync_opt"].append(time.time() - st)

        st = time.time()
        ret = [pq.read_table("df.parquet").to_pandas() for i in range(file_count)]
        time_save["sync_normal"].append(time.time() - st)

    print(f"""
    async : {sum(time_save["async"])/iter_count}
    sync_opt : {sum(time_save["sync_opt"])/iter_count}
    sync_normal : {sum(time_save["sync_normal"])/iter_count}
    """)

if __name__ == "__main__":
    asyncio.run(main())

아래처럼 다수의 Data가 포함된 큰 파일을 읽을 경우(5개)

          a   b   c   d   e
0         9  95  50  53  27
1        49  63  82  57  58
2        41  85   6  72  42
3        44  13  61   6  84
4        27  51  55  72  88
...      ..  ..  ..  ..  ..
9999995  43  76  32  11  52
9999996  90  54  22  67  21
9999997   8  20  32  46  83
9999998  25  53  60  45  85
9999999  35  92  11  64  97

image.png

그리고 작은 size의 파일 다수(테스트에서는 500개)를 case의 경우

        a   b   c   d   e
0       9  40   1  84  66
1       4  92  89  64  71
2      67  39  62  57  23
3      97  57  84  75  85
4       6  76  16  83  46
...    ..  ..  ..  ..  ..
99995  94   1  11  94  93
99996  41  55  62  60  56
99997  84  92  24  79  24
99998  91  22  56  76  64
99999  32  92   2  46  90

[100000 rows x 5 columns]

image.png

위 결과로 확인해 볼 수 있는건, parquet같은 경우 Data를 읽어오는 io작업보다, 읽어온 bytes를 통해서 pyarrow.parquet파일을 생성하는데 더 큰 시간이 든다는것을 추축해 볼 수가 있겠습니다.

그럼 쓸 이유가 있나요?

이러한 방법을 FastAPI나 Django의 async View함수에 활용할 경우

io작업이 일어나는 시간에 다른 사용자의 requests를 처리하는 것이 가능합니다.

4. ChatGPT

근데 혹시 제가 쓴 방법이 최선이 아닌건가 해서 ChatGPT를 이용해서 좀더 새로운 방법이 없나 찾아 봤습니다.

image.png

보면 해당 방법은 asyncio모듈에서 제공하는 방법 중 하나로, eventloop 내에서 처리하는 것이 아니라, 해당 작업에 대한 thread를 새로 생성하고, 이 thread의 생명주기를 asyncio.eventloop가 관리하는 기능 입니다

그래서 다시 테스트를 진행 해 보았는데...(시간 편의상 pandas table로 변환하는건 하지 않았습니다)

import asyncio
import aiofiles
import pyarrow as pa
import pyarrow.parquet as pq
import time


async def read_async(file_nm):
    async with aiofiles.open(file_nm, mode = "rb") as fd:
        return pq.read_table(pa.BufferReader(await fd.read()))

def read_sync(file_nm):
    with open(file_nm, mode = "rb+") as fd:
        return pq.read_table(pa.BufferReader(fd.read()))

async def read_parquet_async(filename):
    table = await asyncio.to_thread(pq.read_table, filename)
    return table


async def main():
    time_save = {"async" : [], "sync_opt" : [], "sync_thread" : []}
    iter_count = 20
    file_count = 5
    for _ in range(iter_count):
        st = time.time()
        ret = await asyncio.gather(*[
            read_async("df.parquet") for i in range(file_count)
        ])
        time_save["async"].append(time.time() - st)

        st = time.time()
        ret = [read_sync("df.parquet") for i in range(file_count)]
        time_save["sync_opt"].append(time.time() - st)

        st = time.time()
        ret = await asyncio.gather(*[
            read_parquet_async("df.parquet") for i in range(file_count)
        ])
        time_save["sync_thread"].append(time.time() - st)


    print(f"""
    async : {sum(time_save["async"])/iter_count}
    sync_opt : {sum(time_save["sync_opt"])/iter_count}
    sync_thread : {sum(time_save["sync_thread"])/iter_count}
    """)

if __name__ == "__main__":
    asyncio.run(main())

image.png

image.png
물론 Thread를 사용해서 게시글에서 말했던 비동기와는 약간 차이가 있지만, 그래도 순식간에 기계가 짜준 코드에서 밀리니 너무 속상해져 버렸습니다.

해당 코드의 단점이라면, 리소스가 제한된 환경에서는 다수의 thread를 만드는것이 제한되어 특정 환경에서는 못쓴다는 단점이 있겠습니다.(하지만 이런경우는 거의 없지요🥲)

728x90
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/09   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
글 보관함