티스토리 뷰
안녕하세요.
대 AI시대에 python한 우물만을 파고있는 Teus입니다
이번 포스팅은, Python의 비동기 라이브러리인 aiofiles를 활용해서
비동기로 Parquet파일을 읽어들이는 방법에 대해서 다룹니다.
aiofiles 패키지를 이용해서 Bytes를 비동기로 읽어오고, 해당 Bytes를 활용해서 Parquet파일을 만들어 볼 예정입니다.
0. Parquet파일?
Parquet파일은 Data Engineering 분야에서 효율적으로 빅데이터를 저장하기 위해서 만들어진 확장자 포맷 입니다.
기존에 CSV를 대체할 목적으로 나왔고, 이전에 홍반장님의 포스팅에서도 다룬적이 있습니다.
Data Projects 에서 I/O Optimization by 홍반장
parquet 파일은 기본적으로 apache에서 제공하는 pyarrow를 사용해서 읽어들입니다
(여러분들이 사용하는 pandas.read_parquet 역시 위 패키지를 기본 엔진으로 사용합니다)
문제는 해당 Libaray는 Python의 비동기 지원을 하지 않습니다.
때문에 python의 FastAPI나 Django의 async view를 사용하는데 제약사항이 됩니다.
1. aiofiles
python의 라이브러리들은 기본적으로 sync로 동작되게 되어 있습니다.
때문에 함수를 async def
로 구성한다고 해도 내부에서 blocking io를 발생시킵니다.
import time
import asyncio
async def mysleep(tm):
time.sleep(tm)
async def main():
st_time = time.perf_counter()
await asyncio.gather(*[
mysleep(3) for i in range(10)
])
print(f"task done with {time.perf_counter() - st_time}")
if __name__ == "__main__":
asyncio.run(main())
이때 time.sleep(sync) -> asyncio.sleep(async)로 바꾸면 원하는 대로 동작하는것을 볼 수 있습니다.
async def mysleep(tm):
await asyncio.sleep(tm)
python의 open
패키지 역시 기본적으로 sync로 동작하게 되어 있습니다.
aiofiles
패키지는 open
패키지의 비동기 버전이라고 볼 수 있습니다.
#https://pypi.org/project/aiofiles/
async with aiofiles.open('filename', mode='r') as f:
contents = await f.read()
print(contents)
'My file contents'
2. pyarrow
pyarrow는 기본적으로 비동기로 동작 합니다.
때문에, python의 async를 사용해도 parquet파일을 읽을 때 사용하는 pyarrow.parquet.read_table()
의 큰 이점은 얻어가기는 어렵습니다.
하지만 pa(=pyarrow.parqet)의 경우, pa.BufferReader
라는 방법을 활용, 해당 Bytes를 활용해서 Parquet File로 읽는것이 가능합니다.
#parquet 파일 생성
import pandas as pd
import numpy as np
df = pd.DataFrame({
"a" : np.random.randint(1, 100, 10000000),
"b" : np.random.randint(1, 100, 10000000),
"c" : np.random.randint(1, 100, 10000000),
"d" : np.random.randint(1, 100, 10000000),
"e" : np.random.randint(1, 100, 10000000)
})
df.to_parquet('df.parquet')
#parquet파일 읽기
import pyarrow as pa
import pyarrow.parquet as pq
with open("df.parquet", "rb+") as fd:
reader = pa.BufferReader(fd.read())
table = pq.read_table(reader)
new_df = table.to_pandas()
print(df == new_df)
3. aiofiles와 pyarrow를 통합하기
아까 제가 python의 open -> aiofiles.open로 대체하여 비동기로 동작하게 만들 수 있다고 했습니다.
이 점을 활용하면, parquer파일을 읽어오는는 i/o작업을 비동기로 실행시키는 것이 가능해 집니다.
import asyncio
import aiofiles
import pyarrow as pa
import pyarrow.parquet as pq
def sync_read():
with open("df.parquet", "rb+") as fd:
reader = pa.BufferReader(fd.read())
table = pq.read_table(reader)
new_df = table.to_pandas()
async def async_read():
async with aiofiles.open("df.parquet", mode = "rb") as fd:
reader = pa.BufferReader(await fd.read())
table = pq.read_table(reader)
new_df = table.to_pandas()
이제 비동기로 다수의 Data를 읽어오는게 얼마나 효율적인지 확인해볼 시간 입니다.
import asyncio
import aiofiles
import pyarrow as pa
import pyarrow.parquet as pq
import time
async def read_async(file_nm):
async with aiofiles.open(file_nm, mode = "rb") as fd:
return pq.read_table(pa.BufferReader(await fd.read())).to_pandas()
def read_sync(file_nm):
with open(file_nm, mode = "rb+") as fd:
return pq.read_table(pa.BufferReader(fd.read())).to_pandas()
async def main():
time_save = {"async" : [], "sync_opt" : [], "sync_normal" : []}
iter_count = 20
file_count = 5
for _ in range(iter_count):
st = time.time()
ret = await asyncio.gather(*[
read_async("df.parquet") for i in range(file_count)
])
time_save["async"].append(time.time() - st)
st = time.time()
ret = [read_sync("df.parquet") for i in range(file_count)]
time_save["sync_opt"].append(time.time() - st)
st = time.time()
ret = [pq.read_table("df.parquet").to_pandas() for i in range(file_count)]
time_save["sync_normal"].append(time.time() - st)
print(f"""
async : {sum(time_save["async"])/iter_count}
sync_opt : {sum(time_save["sync_opt"])/iter_count}
sync_normal : {sum(time_save["sync_normal"])/iter_count}
""")
if __name__ == "__main__":
asyncio.run(main())
아래처럼 다수의 Data가 포함된 큰 파일을 읽을 경우(5개)
a b c d e
0 9 95 50 53 27
1 49 63 82 57 58
2 41 85 6 72 42
3 44 13 61 6 84
4 27 51 55 72 88
... .. .. .. .. ..
9999995 43 76 32 11 52
9999996 90 54 22 67 21
9999997 8 20 32 46 83
9999998 25 53 60 45 85
9999999 35 92 11 64 97
그리고 작은 size의 파일 다수(테스트에서는 500개)를 case의 경우
a b c d e
0 9 40 1 84 66
1 4 92 89 64 71
2 67 39 62 57 23
3 97 57 84 75 85
4 6 76 16 83 46
... .. .. .. .. ..
99995 94 1 11 94 93
99996 41 55 62 60 56
99997 84 92 24 79 24
99998 91 22 56 76 64
99999 32 92 2 46 90
[100000 rows x 5 columns]
위 결과로 확인해 볼 수 있는건, parquet같은 경우 Data를 읽어오는 io작업보다, 읽어온 bytes를 통해서 pyarrow.parquet파일을 생성하는데 더 큰 시간이 든다는것을 추축해 볼 수가 있겠습니다.
그럼 쓸 이유가 있나요?
이러한 방법을 FastAPI나 Django의 async View함수에 활용할 경우
io작업이 일어나는 시간에 다른 사용자의 requests를 처리하는 것이 가능합니다.
4. ChatGPT
근데 혹시 제가 쓴 방법이 최선이 아닌건가 해서 ChatGPT를 이용해서 좀더 새로운 방법이 없나 찾아 봤습니다.
보면 해당 방법은 asyncio모듈에서 제공하는 방법 중 하나로, eventloop 내에서 처리하는 것이 아니라, 해당 작업에 대한 thread를 새로 생성하고, 이 thread의 생명주기를 asyncio.eventloop가 관리하는 기능 입니다
그래서 다시 테스트를 진행 해 보았는데...(시간 편의상 pandas table로 변환하는건 하지 않았습니다)
import asyncio
import aiofiles
import pyarrow as pa
import pyarrow.parquet as pq
import time
async def read_async(file_nm):
async with aiofiles.open(file_nm, mode = "rb") as fd:
return pq.read_table(pa.BufferReader(await fd.read()))
def read_sync(file_nm):
with open(file_nm, mode = "rb+") as fd:
return pq.read_table(pa.BufferReader(fd.read()))
async def read_parquet_async(filename):
table = await asyncio.to_thread(pq.read_table, filename)
return table
async def main():
time_save = {"async" : [], "sync_opt" : [], "sync_thread" : []}
iter_count = 20
file_count = 5
for _ in range(iter_count):
st = time.time()
ret = await asyncio.gather(*[
read_async("df.parquet") for i in range(file_count)
])
time_save["async"].append(time.time() - st)
st = time.time()
ret = [read_sync("df.parquet") for i in range(file_count)]
time_save["sync_opt"].append(time.time() - st)
st = time.time()
ret = await asyncio.gather(*[
read_parquet_async("df.parquet") for i in range(file_count)
])
time_save["sync_thread"].append(time.time() - st)
print(f"""
async : {sum(time_save["async"])/iter_count}
sync_opt : {sum(time_save["sync_opt"])/iter_count}
sync_thread : {sum(time_save["sync_thread"])/iter_count}
""")
if __name__ == "__main__":
asyncio.run(main())
물론 Thread를 사용해서 게시글에서 말했던 비동기와는 약간 차이가 있지만, 그래도 순식간에 기계가 짜준 코드에서 밀리니 너무 속상해져 버렸습니다.
해당 코드의 단점이라면, 리소스가 제한된 환경에서는 다수의 thread를 만드는것이 제한되어 특정 환경에서는 못쓴다는 단점이 있겠습니다.(하지만 이런경우는 거의 없지요🥲)
'Python 잡지식' 카테고리의 다른 글
[Python]MultiProcessing 탐구 (0) | 2024.08.21 |
---|---|
[Python]Python의 asyncio 탐구 (0) | 2024.07.10 |
[Python]Flask의 async함수 지원에 대해서 (0) | 2024.06.01 |
[Python]Uvicorn에 대해서 (0) | 2024.05.08 |
[Python]Python 함수 실행간 메모리 측정하기 (0) | 2023.10.23 |
- Total
- Today
- Yesterday
- 동적계획법
- 이분탐색
- Sort알고리즘
- Search알고리즘
- AVX
- Python
- stack
- GDC
- heap
- 코딩테스트
- 병렬처리
- 컴퓨터그래픽스
- Greedy알고리즘
- C++
- git
- 분할정복
- 자료구조
- prime number
- SIMD
- 알고리즘
- hash
- 완전탐색 알고리즘
- 사칙연산
- 프로그래머스
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |