티스토리 뷰

728x90
반응형

안녕하세요. Teus입니다.

이번 포스팅은 Python의 Multiprocessing을 사용하는 과정에서 발생하는 문제점들을 탐구하는 포스팅 입니다.

0. Python의 Multiprocessing

예전부터 이야기 하지만, Python의 GIL 때문에, Python에서 Multicore의 이점을 가져가기 위해선 MultiThreading이 아니라 Multiprocessing이 필요합니다.

이런 Prcoess끼리는 서로 Data와 Heap, Stack을 공유하지 않기 때문에

Process를 생성하고, 해당 Process가 새로운 작업을 시작하는데는 Thread를 사용하는것 보다 많은 시간이 요구됩니다.

import multiprocessing as mp
import threading
import timeit
def foo(n):
    ret = 0
    for i in range(n):
        ret += i
    return ret


if __name__ == "__main__":
    th_tm = timeit.timeit("threading.Thread(target = foo, kwargs = {'n':100})", setup="import threading; from __main__ import foo", number = 100)
    print(f"time for Threading gen = {th_tm/100}")
    mp_tm = timeit.timeit("mp.Process(target = foo, kwargs = {'n':100})", setup="import multiprocessing as mp;  from __main__ import foo", number = 100)
    print(f"time for Process gen = {mp_tm/100}")

image.png

이때 Python의 경우 Java와 유사하게 ThreadPool, ProcessPool을 통해서 다수의 Thread, Process에 Job을 분배할 수 있습니다.

1. Pool 생성 시간

Process는 기본적으로 spawn과 fork 두가지 방법으로 만들어 낼 수 있습니다.

Spawn과 Fork는 무슨 차이 인가요?
window : Spawn만 사용 가능
Max OS : Spawn과 Fork사용 가능
Linus : Spawn과 Fork사용 가능

Fork Spawn
Fork는 Parent Process의 Memory를 복제해서 새로운 Process를 만듦, 대신 Parent Process와의 Data등이 공유가됨 Spawn은 Parent Process의 Memory 복제 없이, 새로운 Process를 동작시켜서 Isolation 환경을 구현함.

위와같은 차이점 때문에, Fork대비 Spawn은 Process를 만드는데 더 긴 시간이 걸립니다.
참고문헌 : https://www.geeksforgeeks.org/understanding-fork-and-spawn-in-python-multiprocessing/

Process의 생성과 다르게, Process Pool같은 경우는 Pool을 만드는데 추가적인 시간이 소요됩니다.

import multiprocessing as mp
import time
import timeit

if __name__ == "__main__":
    sp_tm = timeit.timeit("mp.get_context('spawn').Pool(8)", setup="import multiprocessing as mp", number = 100)
    print(f"time for spawn pool gen = {sp_tm/100}")
    fk_tm = timeit.timeit("mp.get_context('fork').Pool(8)", setup="import multiprocessing as mp", number = 100)
    print(f"time for fork pool gen = {fk_tm/100}")

image.png

위 시간을 보면, 분명 spawn이 fork대비 더 시간이 소비되지만, 낮은 Latency을 요구하는 task가 아닐경우 충분히 사용할만 해 보입니다.

과연 그럴까요?🤔

2. spawn의 함정

1. spawn은 Process 생성 이외에 추가적인 Latency가 존재합니다.

import multiprocessing as mp
import time
import os

def foo(_tm):
    print(f"in {os.getpid()}, st time = {time.time()-_tm}")
    ret = 0
    for i in range(1000000):
        ret += i
    return ret

if __name__ == "__main__":
    method = "spawn"
    #method = "fork"
    core_cnt = 2    
    st = time.time()    
    pool = mp.get_context(method).Pool(core_cnt)
    print(f"time for {method} pool gen = {time.time()-st}")
    st = time.time()
    pool.map(func = foo, iterable = [st for i in range(core_cnt)])
    print(f"time for {method} time = {time.time()-st}")   
    st = time.time()

spawn time

image.png


fork time

image.png

위 결과를 보면알수 있지만, fork같은 경우 pool gen time이후에 첫 Process가 동작하는 시간이 거의 걸리지 않습니다.

하지만 spawn time같은 경우 pool gen time같은 경우 처음 Process가 동작하기까지 0.03~4초의 알수없는 대기시간이 걸리는것을 알 수 있습니다.

import multiprocessing as mp
import time
import os

def foo(_tm):
    print(f"in {os.getpid()}, st time = {time.time()-_tm}")
    ret = 0
    for i in range(1000000):
        ret += i
    return ret

if __name__ == "__main__":
    method = "spawn"
    #method = "fork"
    core_cnt = 2    
    st = time.time()    
    pool = mp.get_context(method).Pool(core_cnt)
    print(f"time for {method} pool gen = {time.time()-st}")
    #pool 생성 후 pool 사용 전 대기시간 삽입.
    time.sleep(3)
    st = time.time()
    pool.map(func = foo, iterable = [st for i in range(core_cnt)])
    print(f"time for {method} time = {time.time()-st}")    
    pool.close()

spawn time

image.png

fork time

image.png

spawn의 대기시간을 추가한 경우,Process를 나눠서 실행하는 작업은 fork와 같은 시간으로 실행되는 것을 볼 수 있습니다.

 

2. spawn은 package의 import 수에 따라서 더 많은 시간이 소요된다.

spawn같은 경우, Parent Process와 동일한 환경을 만들기 위해서 Parent Process의 동작을 유사하게 수행합니다.

때문에 Parent Process에서 무거운 패키지(Ex. Tensorflow, Pandas...)를 import하면 process spawn time이 기하급수적으로 증가하게 됩니다.

import multiprocessing as mp
import time
import os
import tensorflow

def foo(_tm):
    ...

if __name__ == "__main__":
    ...

spawn time

image.png


fork time

image.png



3. Package를 import하는 위치에 따라 병목이 바뀐다

위 코드에서, tensorflow를 import하는 부분을 살짝 수정해보겠습니다.

import multiprocessing as mp
import time
import os

def foo(_tm):
    ...

if __name__ == "__main__":
    import tensorflow
    ...

spawn time

image.png


fork time

image.png


import package를 __main__에 넣을 경우, 보는것처럼 Spawn에서도 자식 Processs를 실행하는데 추가적인 시간이 붙지 않는것을 볼 수 있습니다.

 

해당 이유가, 자식 Process의 경우 __name__의 값이 __mp_main__으로 되어있습니다.

 

그래서 자식 Process에서 반복되는 패키지의 import를 막을 수 있습니다.

 

하지만 이럴경우 자식 Process에서는 tensorflow패키지를 사용할 수 없습니다.
(필요시 import를 다시 해 주어야됨)

 

4. Data를 전달하는데 method와는 무관하다

linux에서 fork를 사용하면 되니, 문제가 없을 거라고 생각할 수 있지만

fork와 spawn의 경우 Data를 매개변수로 전달하는데는 시간의 차이가 별로 발생하지 않습니다.

import multiprocessing as mp
import time
def foo(df, _tm):
    import os    
    print(f"in {os.getpid()}, st time = {time.time()-_tm}")
    return df

if __name__ == "__main__":
    method = "spawn"
    #method = "fork"
    core_cnt = 2  
    pool = mp.get_context(method).Pool(core_cnt)
    data_num = 10000000
    import pandas as pd
    import numpy as np
    df = pd.DataFrame({
        "a" : np.random.randint(0, 100, data_num),
        "b" : np.random.randint(0, 1000, data_num),
        "c" : np.random.randint(0, 10000, data_num)
    })
    st = time.time()        
    ret = pool.starmap(foo, ((i, st) for i in np.array_split(df, core_cnt)))    
    print(f"time for {method} pool = {time.time() - st}")
    pool.close()

spawn time

image.png

fork time

image.png

3. spawn을 잘 사용하는 방법

위까지 상황을 정리해보면

  1. spawn은 fork보다 Process를 만드는데 오랜시간이 걸린다.
  2. spawn후에 일정 시간동안 Process를 사용할 수 없다,
  3. spawn전에 Package가 많이 import되면 Process를 만드는데 더 오랜 시간이 걸린다.

이런 상태 입니다.

이때 Pool과 Process를 조금이라도 유용하게 쓸 수 있는 부분을 찾아보겠습니다.

Pool을 사용할 경우
  1. Pool을 __main__의 시작 부분에서 생성한다.(Process의 생성시간을 감춤)
  2. Pool에서 함수 내부에서 필요한 Packege를 import 한다.(Process의 생성 시간을 최소화 시킴)
  3. Pool을 한번 만들고, 계속 재사용 한다.(최초 Process Pool의 생성 시간을 최소화함)
Process를 사용할 경우
  1. Process를 __main__의 시작 부분에서 생성한다.(Process의 생성시간을 감춤)
  2. Process에서 함수 내부에 필요한 Package를 import 한다.(Process의 생성 시간을 최소화 시킴)
  3. Pool과 다르게 Process는 start()를 할 때마다 시간이 발생하여, 한번 만들 Process를 계속 사용하기

을 통해서 효율적인 병렬 처리를 이뤄낼 수 있습니다.

아래 예시를 통해서 알아보겠습니다.

  1. Pandas DataFrame을 만든다.(A, B, C 값을 갖는 컬럼)
  2. Pandas DataFrame을 4개의 Pool이 나눠서 RowWise Add를 진행한다.
  3. Pandas DataFrame의 ColWise mean를 통해서 전체 평균을 구함
  4. 3에서 구한 Row를 모든 DataFrame에 더한다
  5. 3에서 구한 DataFrame을 모두 문자열로 바꾸고, 문자열의 길이를 구한다.

1️⃣. normal flow로 code를 진행한 경우

import time
def foo1(df):
    return df.sum(axis = 1)

def foo2(df, row):
    temp = (df + row).astype(str)
    for col in temp.columns:
        temp.loc[:, col] = temp.loc[:, col].str.len()
    return temp

if __name__ == "__main__":        
    data_num = 10000000
    import pandas as pd
    import numpy as np
    df = pd.DataFrame({
        "a" : np.random.randint(0, 100, data_num),
        "b" : np.random.randint(0, 1000, data_num),
        "c" : np.random.randint(0, 10000, data_num)
    })    
    st = time.time()    
    row_sum = foo1(df)
    df["row_sum"] = row_sum
    col_mean = df.mean(axis = 0).astype(int)
    ret = foo2(df, col_mean)    
    print(f"time for normal pool = {time.time() - st}")

image.png

2️⃣. Pool을 초기에 만들고, 일부 대기시간을 만든뒤 실행한 경우

import multiprocessing as mp
import time
def foo1(df):
    return df.sum(axis = 1)

def foo2(df, row):
    temp = (df + row).astype(str)
    for col in temp.columns:
        temp.loc[:, col] = temp.loc[:, col].str.len()
    return temp

if __name__ == "__main__":    
    core_cnt = 4
    pool = mp.Pool(core_cnt)
    data_num = 10000000
    #main Process에서만 사용할 Package를 import
    import pandas as pd
    import numpy as np
    df = pd.DataFrame({
        "a" : np.random.randint(0, 100, data_num),
        "b" : np.random.randint(0, 1000, data_num),
        "c" : np.random.randint(0, 10000, data_num)
    })
    #pool이 생성되고, 활성화 될 때 까지 일정시간 다른작업을 진행함
    time.sleep(3)
    st = time.time()    
    row_sum = pool.map(foo1, np.array_split(df, core_cnt))
    df["row_sum"] = pd.concat(row_sum)
    col_mean = df.mean(axis = 0).astype(int)
    ret = pool.starmap(foo2, ((i, col_mean) for i in np.array_split(df, core_cnt)))
    final_df = pd.concat(ret)
    print(f"time for optimized pool = {time.time() - st}")

image.png

tmi
아래 부록에서 나오게 되지만, Main Process -> Child Process로 Data가 이동할때는 해당 Data를 Pickle화 하고, Pickle화 된 Object를 Childe Process에서 읽어들입니다.
이때 Pandas Object같은 경우 Pickle화, 역Pickle화에 많은 시간이 소요됩니다. 그래서 Pandas Object가지고 병렬처리 하는 작업은 특별한 경우가 아니면, 고성능을 위해서 사용하기 어려운 경우가 많습니다.

부록. spawn 파해치기.(이거는 궁금하신분만😉)

먼저 일반적으로 사용하는 Process를 살펴보면 아래와 같습니다.

#https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/context.py#L220
class Process(process.BaseProcess):
    _start_method = None
    @staticmethod
    def _Popen(process_obj):
        return _default_context.get_context().Process._Popen(process_obj)

    @staticmethod
    def _after_fork():
        return _default_context.get_context().Process._after_fork()

이때 Process의 실제 내부 소스코드는 BaseProcess를 상속받아서 사용하게 됩니다.

그래서 Process.start()Process.join()같은 method는 BaseProcess내부에서 구현됩니다.

Process를 만드는 생성자에서는 별도의 함수를 통해서 Process가 만들어지지는 않습니다.

대신, Process의 생성정보들을 Property로 저장하고, Process.start()를 기다리게 됩니다.

#https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/process.py#L71
_current_process = _MainProcess() #BaseProcess를 상속받은 Cls

class BaseProcess(object):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
                 *, daemon=None):                
    ...

    def start(self):
        '''Start child process'''
        ...
        self._popen = self._Popen(self)
        self._sentinel = self._popen.sentinel        
        del self._target, self._args, self._kwargs
        _children.add(self)

이제 start가 실행되면 self._Popen(self)가 실행되게 됩니다.

여기서 _Popen의 경우 _default_context.get_context()가지고 있는 Process를 접근하게 됩니다.

#https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/context.py#L275
if sys.platform != 'win32':
    class SpawnProcess(process.BaseProcess):
        _start_method = 'spawn'
        @staticmethod
        def _Popen(process_obj):
            from .popen_spawn_posix import Popen
            return Popen(process_obj)

        @staticmethod
        def _after_fork():
            # process is spawned, nothing to do
            pass

    class SpawnContext(BaseContext):
        _name = 'spawn'
        Process = SpawnProcess
    _default_context = DefaultContext(SpawnContext())

이때 SpawnProcess가 가지고있는 _Popen()을 통해서 posix방식으로 spawn을 진행하게 됩니다.

#https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/popen_spawn_posix.py#L26
class Popen(object):
    method = 'fork'

    def __init__(self, process_obj):
        util._flush_std_streams()
        self.returncode = None
        self.finalizer = None
        self._launch(process_obj)
    ...

class Popen(popen_fork.Popen):
    method = 'spawn'
    DupFd = _DupFd

    def __init__(self, process_obj):
        self._fds = []
        super().__init__(process_obj)

    def _launch(self, process_obj):

Fork와 비슷하지만, _launch를 사용해서 Process에 시동을 거는 부분이 다르게 됩니다.

#Fork의 _launch
#https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/popen_fork.py#L12
class Popen(object):
    method = 'fork'
    ...

    def _launch(self, process_obj):
        code = 1
        parent_r, child_w = os.pipe()
        child_r, parent_w = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            try:
                os.close(parent_r)
                os.close(parent_w)
                code = process_obj._bootstrap(parent_sentinel=child_r)
            finally:
                os._exit(code)
        else:
            os.close(child_w)
            os.close(child_r)
            self.finalizer = util.Finalize(self, util.close_fds,
                                           (parent_r, parent_w,))
            self.sentinel = parent_r

fork의 경우 os module의 system call을 통해서만 Process를 만들고 관리하는것을 확인할 수 있습니다.

#fork의 _launch
#https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/popen_spawn_posix.py#L26
class Popen(popen_fork.Popen):
    method = 'spawn'
    ...

    def _launch(self, process_obj):
        from . import resource_tracker
        tracker_fd = resource_tracker.getfd()
        self._fds.append(tracker_fd)
        prep_data = spawn.get_preparation_data(process_obj._name)
        fp = io.BytesIO()
        set_spawning_popen(self)
        try:
            reduction.dump(prep_data, fp)
            reduction.dump(process_obj, fp)
        finally:
            set_spawning_popen(None)

        parent_r = child_w = child_r = parent_w = None
        try:
            parent_r, child_w = os.pipe()
            child_r, parent_w = os.pipe()
            cmd = spawn.get_command_line(tracker_fd=tracker_fd,
                                         pipe_handle=child_r)
            self._fds.extend([child_r, child_w])
            self.pid = util.spawnv_passfds(spawn.get_executable(),
                                           cmd, self._fds)
            self.sentinel = parent_r
            with open(parent_w, 'wb', closefd=False) as f:
                f.write(fp.getbuffer())
        finally:
            fds_to_close = []
            for fd in (parent_r, parent_w):
                if fd is not None:
                    fds_to_close.append(fd)
            self.finalizer = util.Finalize(self, util.close_fds, fds_to_close)

            for fd in (child_r, child_w):
                if fd is not None:
                    os.close(fd)

spawn의 경우 fork보다 많은 작업을 하고, 특히 open을 통해서 I/O작업이 동반된다는 것을 확인할 수 있습니다.

좀더 코드를 쪼개서 확인해 보겠습니다.

#https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/popen_spawn_posix.py#L26
class Popen(popen_fork.Popen):
    method = 'spawn'
    def _launch(self, process_obj):
        from . import resource_tracker
        tracker_fd = resource_tracker.getfd()
        self._fds.append(tracker_fd)

https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/resource_tracker.py#L58
일단 resource_tracker라는 object를 통해서 현재 Parent가 관리하는 resource를 확인합니다.
(내부 내용은... 보니깐 정신이 나갈거 같아서 생략합니다🙄)

#https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/popen_spawn_posix.py#L26
class Popen(popen_fork.Popen):
    method = 'spawn'
    def _launch(self, process_obj):
        ...
        prep_data = spawn.get_preparation_data(process_obj._name)
        fp = io.BytesIO()

https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/spawn.py#L160
그리고 현재 시스템의 환경범수를 dict에 담아서 prep_data변수에 보관합니다.

그리고 io.BytesIO()를 통해서 I/O 작업을 준비시킵니다.

#https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/popen_spawn_posix.py#L26
class Popen(popen_fork.Popen):
    method = 'spawn'
    def _launch(self, process_obj):
        ...
        set_spawning_popen(self)
        try:
            reduction.dump(prep_data, fp)
            reduction.dump(process_obj, fp)
        finally:
            set_spawning_popen(None)

https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/reduction.py#L33
위에서 만들어진 prep_data와 process_obj를 fp에다가 pickle로 직렬화 시키고, 해당 Data를 io에 기록합니다.

#https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/popen_spawn_posix.py#L26
class Popen(popen_fork.Popen):
    method = 'spawn'
    def _launch(self, process_obj):
        ...
        parent_r = child_w = child_r = parent_w = None
        try:
            parent_r, child_w = os.pipe()
            child_r, parent_w = os.pipe()
            cmd = spawn.get_command_line(tracker_fd=tracker_fd,
                                         pipe_handle=child_r)

https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/spawn.py#L83
이제 새로운 Process에서 실행할 command를 준비해 줍니다.

#https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/popen_spawn_posix.py#L26
class Popen(popen_fork.Popen):
    method = 'spawn'
    def _launch(self, process_obj):
        ...
        try:
            ...
            self._fds.extend([child_r, child_w])
            self.pid = util.spawnv_passfds(spawn.get_executable(),
                                           cmd, self._fds)

이제 준비된 데이터를 가지고 Process생성 작업을 시작합니다.

#https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/util.py#L450
def spawnv_passfds(path, args, passfds):
    import _posixsubprocess
    import subprocess
    passfds = tuple(sorted(map(int, passfds)))
    errpipe_read, errpipe_write = os.pipe()
    try:
        return _posixsubprocess.fork_exec(
            args, [path], True, passfds, None, None,
            -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
            False, False, -1, None, None, None, -1, None,
            subprocess._USE_VFORK)
    finally:
        os.close(errpipe_read)
        os.close(errpipe_write)

_posixsubprocess.fork_exec의 경우 다양한 매개변수를 받고, 이를 기반으로 Process를 만든 뒤 pid를 return해 줍니다.

즉, util.spawnv_passfds()를 통해서 새로운 Process라 fork되는것을 알 수 있습니다.

#https://github.com/python/cpython/blob/3.12/Lib/multiprocessing/popen_spawn_posix.py#L26
class Popen(popen_fork.Popen):
    method = 'spawn'
    def _launch(self, process_obj):
        ...
        try:
            ...
            with open(parent_w, 'wb', closefd=False) as f:
                f.write(fp.getbuffer())

마지막으로 이 부분이 중요합니다.

일반적인 fork에서는 os.fork() 이후에 Parent와 child Process간의 부수적인 작업이 발생하지 않습니다.

하지만 spawn의 경우 마지막에 Parent -> Child로 os.pipe를 통해서 Data를 이동시킵니다.

이 과정에서 pickle로 직렬화된 prep_data, process_obj

I/O 작업을 통해서 자식 Process로 넘어가게 되고, 이 작업이 완료 된 다음에 Process에서 작업을 할 수가 있을겁니다.

Fork와 다르게 발생한 Spawn에서 발생하는 부수적인 Latency는 위 I/O작업이 원인으로 추정해볼수가 있습니다.

728x90
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/09   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30
글 보관함