이번 포스팅은, Pandas DataFrame의 .apply method에 대해서 파해쳐 봅니다.



Pandas의 경우, DataFrame이나 Series에 .apply method를 사용해서, 사용자 지정 함수를 Elementwise하게 적용하는 것이 가능합니다.

import pandas as pd
temp_df = pd.DataFrame({"a" : [i*1 for i in range(100)],
                        "b" : [i*2 for i in range(100)],
                        "c" : [i*3 for i in range(100)],
                        "d" : [i*4 for i in range(100)],
                        "e" : [i*5 for i in range(100)]})
#"a" col에 ElementWise하게 제곱을 적용
temp_df["a"].apply(lambda x : x*x)

하지만, apply Method는 Code의 가독성을 높여주지만, 반복문을 사용하는 경우보다 속도가 느릴 수 있다고 알려져 있습니다. 과연 그럴까요?

2. 어떻게 동작하는가?

우선 Pandas DataFrame Object의 Source Code를 살펴보겠습니다.

class DataFrame(NDFrame, OpsMixin):
    def apply(
        func: AggFuncType,
        axis: Axis = 0,
        raw: bool = False,

        from pandas.core.apply import frame_apply

        op = frame_apply(
        return op.apply().__finalize__(self, method="apply")

apply method의 경우, DataFrame Class내에 Method 형태로 원형이 존재합니다.
이때 pandas.core.apply의 source code를 가지고와서, frame_apply Object를 만들어 주는것을 볼 수 있습니다.

def frame_apply(
    obj: DataFrame,
    func: AggFuncType,
    axis: Axis = 0,
    raw: bool = False,
    result_type: str | None = None,
) -> FrameApply:
    """construct and return a row or column based frame apply object"""
    axis = obj._get_axis_number(axis)
    klass: type[FrameApply]
    if axis == 0:
        klass = FrameRowApply
    elif axis == 1:
        klass = FrameColumnApply

    return klass(

apply function의 경우, Row방향 or Col방향 함수적용이기 때문에, input axis에 따라서 klass를 가변적으로 사용하는것을 볼 수 있습니다.
(두가지 모두 FrameApply를 상속받은 Class이기 때문에, Source Code는 FrameApply를 기준으로 살펴봅니다)

class FrameApply(NDFrameApply):
    obj: DataFrame
    def apply(self) -> DataFrame | Series:
    """compute the results"""
    # dispatch to agg
    if is_list_like(self.f):
        return self.apply_multiple()

    # all empty
    if len(self.columns) == 0 and len(self.index) == 0:
        return self.apply_empty_result()

    # string dispatch
    if isinstance(self.f, str):
        return self.apply_str()

    # ufunc
    elif isinstance(self.f, np.ufunc):
        with np.errstate(all="ignore"):
            results = self.obj._mgr.apply("apply", func=self.f)
        # _constructor will retain self.index and self.columns
        return self.obj._constructor(data=results)

    # broadcasting
    if self.result_type == "broadcast":
        return self.apply_broadcast(self.obj)

    # one axis empty
    elif not all(self.obj.shape):
        return self.apply_empty_result()

    # raw
    elif self.raw:
        return self.apply_raw()

    return self.apply_standard()

FramApply Class의 경우 NDFramApply를 상속받고, NDFramApply는 Apply Class를 상속받아 구현됩니다.
결국 FrameApply Class에 전달된 매개변수를 이용해서 Apply Class Object가 만들어 지는것을 확인할 수 있으며,
Apply Object가 DataFrame.apply를 통해서 입력한 Function을 self.f self.orig_f로 가지고 있는것을 볼 수 있습니다.

class Apply(metaclass=abc.ABCMeta):
    axis: int

    def __init__(
        obj: AggObjType,
        raw: bool,
        result_type: str | None,
        self.obj = obj
        self.raw = raw
        self.args = args or ()
        self.kwargs = kwargs or {}

        if result_type not in [None, "reduce", "broadcast", "expand"]:
            raise ValueError(
                "invalid value for result_type, must be one "
                "of {None, 'reduce', 'broadcast', 'expand'}"

        self.result_type = result_type

        # Funciton을 적용하기 위해서 함수를 저장
        # curry if needed
        if (
            (kwargs or args)
            and not isinstance(func, (np.ufunc, str))
            and not is_list_like(func)

            def f(x):
                return func(x, *args, **kwargs)

            f = func

        self.orig_f: AggFuncType = func
        self.f: AggFuncType = f

이제 frame_apply Object에 대해서 알았고, fram_apply Object의 return으로 DataFrame이 반환된다는 것을 확인 했습니다.
이때 특수 Case를 제외하고, apply_standard에 대해서 알아보면 아래와 같습니다.

    def series_generator(self):
        return (self.obj._ixs(i, axis=1) for i in range(len(self.columns)))

    def apply_standard(self):
        results, res_index = self.apply_series_generator()

        # wrap results
        return self.wrap_results(results, res_index)

    def apply_series_generator(self) -> tuple[ResType, Index]:
        assert callable(self.f)

        series_gen = self.series_generator
        res_index = self.result_index

        results = {}

        with option_context("mode.chained_assignment", None):
            #=====================실제 함수 실행 부분===========================#
            for i, v in enumerate(series_gen):
                # ignore SettingWithCopy here in case the user mutates
                results[i] = self.f(v)
                if isinstance(results[i], ABCSeries):
                    # If we have a view on v, we need to make a copy because
                    #  series_generator will swap out the underlying data
                    results[i] = results[i].copy(deep=False)
            #=====================실제 함수 실행 부분===========================#
        return results, res_index

    def wrap_results(self, results: ResType, res_index: Index) -> DataFrame | Series:
        from pandas import Series

        # see if we can infer the results
        if len(results) > 0 and 0 in results and is_sequence(results[0]):
            return self.wrap_results_for_axis(results, res_index)

        # dict of scalars

        # the default dtype of an empty Series will be `object`, but this
        # code can be hit by df.mean() where the result should have dtype
        # float64 even if it's an empty Series.
        constructor_sliced = self.obj._constructor_sliced
        if constructor_sliced is Series:
            result = create_series_with_explicit_dtype(
                results, dtype_if_empty=np.float64
            result = constructor_sliced(results)
        result.index = res_index

        return result

소스코드에서 볼 수 있듯, 내부로 타고 들어와서 결국 apply_series_generator에서 self.series_generator Method로 Series를 Generator 형태로 받고
이 Generator를 반복문을 돌면서 Function을 적용하는 것을 확인 할 수가 있습니다.
(finalize 같은 경우, other이 정의되어 있지 않을 경우 그래도 입력받은 DataFrame을 반환합니다)

출처 : https://github.com/pandas-dev/pandas/blob/e8093ba372f9adfe79439d90fe74b0b5b6dea9d6/pandas/core/generic.py#L5518


DataFrame.apply를 보면, Col단위로 Series를 뽑아내고 Function을 적용하는 것을 확인할 수 있었습니다.
이때, Axis를 Row단위로 설정할 경우, Col마다 뽑아온 Series에 추가적으로 Apply를 하게 됩니다.
따라서, Series의 apply에 대해서 추가적으로 보도록 하겠습니다.

class Series(base.IndexOpsMixin, NDFrame):
    def apply(
            func: AggFuncType,
            convert_dtype: bool = True,
            args: tuple[Any, ...] = (),
        ) -> DataFrame | Series:
        return SeriesApply(self, func, convert_dtype, args, kwargs).apply()
class SeriesApply(NDFrameApply):
    obj: Series
    axis = 0

    def __init__(
        obj: Series,
        func: AggFuncType,
        convert_dtype: bool,
        self.convert_dtype = convert_dtype


    def apply(self) -> DataFrame | Series:
        obj = self.obj

        if len(obj) == 0:
            return self.apply_empty_result()

        # dispatch to agg
        if is_list_like(self.f):
            return self.apply_multiple()

        if isinstance(self.f, str):
            # if we are a string, try to dispatch
            return self.apply_str()

        return self.apply_standard()
    def apply_standard(self) -> DataFrame | Series:
        f = self.f
        obj = self.obj

        with np.errstate(all="ignore"):
            if isinstance(f, np.ufunc):
                return f(obj)

            # row-wise access
            if is_extension_array_dtype(obj.dtype) and hasattr(obj._values, "map"):
                # GH#23179 some EAs do not have `map`
                mapped = obj._values.map(f)
                values = obj.astype(object)._values
                # error: Argument 2 to "map_infer" has incompatible type
                # "Union[Callable[..., Any], str, List[Union[Callable[..., Any], str]],
                # Dict[Hashable, Union[Union[Callable[..., Any], str],
                # List[Union[Callable[..., Any], str]]]]]"; expected
                # "Callable[[Any], Any]"
                mapped = lib.map_infer(
                    f,  # type: ignore[arg-type]

        if len(mapped) and isinstance(mapped[0], ABCSeries):
            # GH#43986 Need to do list(mapped) in order to get treated as nested
            #  See also GH#25959 regarding EA support
            return obj._constructor_expanddim(list(mapped), index=obj.index)
            return obj._constructor(mapped, index=obj.index).__finalize__(
                obj, method="apply"

apply_standard에서, function이 np.ufunc에 해당하면 바로 function(Series.obj)을 적용 하지만, 그러지 않을 경우 dtype을 object로 변경하고, 해당 object를 lib.map_infer로 넘기는 것을 확인할 수 있습니다.

def map_infer(
    ndarray arr, object f, bint convert=True, bint ignore_na=False
) -> np.ndarray:    
        Py_ssize_t i, n
        ndarray[object] result
        object val

    n = len(arr)
    result = np.empty(n, dtype=object)
    for i in range(n):
        if ignore_na and checknull(arr[i]):
            result[i] = arr[i]
        val = f(arr[i])

        if cnp.PyArray_IsZeroDim(val):
            # unbox 0-dim arrays, GH#690
            val = val.item()

        result[i] = val

    if convert:
        #maybe_convert_object source : 
        return maybe_convert_objects(result,

    return result

lib.map_infer의 경우, Cython으로 정의되어 있는것을 볼 수 있습니다. numpy.array를 입력받아 동일한 크기의 empty array를 생성하고
Cython으로 반복문을 돌면서 array[index] = function(array[index])을 적용해 주는것을 확인할 수 있습니다.
이후에 Series의 생성자를 이용해서 새로운 Series를 생성해서 반환해 주는것을 확인할 수 있습니다.

참고 : Cython으로 최적화를 진행할 경우, Python 함수 적용 시간은 비슷할 수고 있지만, Cython for를 사용하기 때문에 Python for대비 우월한 속도를 확보할 수 있습니다.

4. BenchMark

간단하게 row count 50만개인 DataFrame에 대해서 power2를 적용하는 예제입니다.

import pandas as pd
import time
row = 5000000
temp_df = pd.DataFrame({"a" : [i*1 for i in range(row)],
                        "b" : [i*2 for i in range(row)],
                        "c" : [i*3 for i in range(row)],
                        "d" : [i*4 for i in range(row)],
                        "e" : [i*5 for i in range(row)]})
def foo(x):    
    return x*x

#======================Row 단위 apply===============================
st = time.time()
temp_df["a"].apply(lambda x : foo(x))
print("Series apply time : ", time.time() - st)

temp_series = temp_df["a"].copy()
st = time.time()
for idx, val in enumerate(temp_series):
    temp_series[idx] = foo(val)
print("Series normal time : ", time.time() - st)

st = time.time()
temp_df["a"] = foo(temp_df["a"])
print("Series Direct Function time : ", time.time() - st)
#======================Row 단위 apply===============================

#======================Col 단위 apply===============================
st = time.time()
temp_df.apply(lambda x : foo(x), axis = 0)
print("DataFrame apply time : ", time.time() - st)

st = time.time()
result = {}
for i in (temp_df._ixs(i, axis=1) for i in range(len(temp_df.columns))):
    result[i.name] = foo(i)
print("DataFrame normal time : ", time.time() - st)

st = time.time()
result = {}
for i in (temp_df._ixs(i, axis=1) for i in range(len(temp_df.columns))):
    result[i.name] = foo(i)
print("DataFrame normal time(without DataFrame Construct) : ", time.time() - st)
#======================Col 단위 apply===============================
Series apply time :  2.672856092453003
Series normal time :  26.50118923187256
Series Direct Function time :  0.09272193908691406
DataFrame apply time :  0.1655576229095459
DataFrame normal time :  0.18553519248962402
DataFrame normal time(without DataFrame Construct) :  0.09574413299560547

Pandas apply사용 유무와 완벽히 일치하는 Case는 아니지만
RowApply의 경우 Cython 최적화로 Python 반복문 대비 우월한 속도를 확인할 수 있습니다.
ColApply의 경우 Result Dict을 생성하는데 apply의 상단부분의 시간을 사용하는것을 확인할 수 있으며,
외부에서 Dict을 이용해서 만드는 DataFrame보다 Class내부 _construct로 만드는 것이 더 효율적이기 때문에
역시 normal한 사용 대비 빠른것을 확인할 수 있습니다.

5. 결론

  1. 수치연산 같은 특수한 함수가 아니면 apply로 쓰는게 무조건 빠르다(Cython 최적화 때문에)
  2. 연산자가 정의된 수치연산의 경우 function(Series)형태로 쓰는게 가장 빠르다.

