Skip to content

Shared Pandas dataframe performance in Parallel when heavy dict is present #1244

@fingoldo

Description

@fingoldo

First of all, I wanted to thank the creators of joblib. Recently I discovered that under some conditions, joblib is able to share even huge Pandas dataframes with workers running in separate processes effectively. That means one can run delayed function in a parallel fashion by feeding it with a dataframe argument without doing its full copy in each of the child processes. Workers seem to receive only reduced set of variables and are able to start their chores immediately. It's cool, but not mentioned in the docs at all. However, I noticed that, at least on Windows, such behavior changes significantly when there is at least one more argument consisting of, for example, a heavy dict. In such case, full copy is created for each child process, and computation starts sequentially for each worker, only after its copy is created and passed to the right destination. I have created a script to reproduce the issue. It runs a delayed function either with just a dataframe or with an additional dict argument.

import pandas as pd
import numpy as np

from joblib import Parallel, delayed, parallel_backend
from numpy.random import choice
import pkg_resources
import platform
import argparse
import time

def compute_sum_df(df,col,dummy)->float:
    t=0.0
    for n in range(50):
        t+=df[col].sum()
    return t
    
def sys_info(huge_dict:bool)->str:
    uname=platform.uname()
    
    versions=''
    for package in 'pandas joblib'.split():
        versions+=f"{package}: {pkg_resources.get_distribution(package).version} "
        
    print(f"Running with huge_dict={huge_dict} on {uname.system} {uname.release} {uname.processor} ({versions})")

if __name__ == "__main__":          

        
    parser = argparse.ArgumentParser()
    parser.add_argument('--huge_dict', type=int, default=False, help='Pass huge dict along with big dataframe') 
    args = parser.parse_args()
    print(args.huge_dict)    
    
    sys_info(args.huge_dict)

    ncols=10
    cols_names=np.array(list(map(str,range(ncols))))
    
    df = pd.DataFrame(np.random.randn(10_000_000, ncols), columns=cols_names)
    
    n_jobs=20
    
    k=100_000
    huge_dict=dict(zip(np.arange(k),np.random.rand(k,1)))

    start = time.time()
    
    results = Parallel(n_jobs=n_jobs, backend='loky', prefer='processes', temp_folder=r"R:/Temp/")(
        delayed(compute_sum_df)(
            df=df,col=col,dummy=huge_dict if args.huge_dict==1 else None
        )
        for col in cols_names[choice(ncols,n_jobs)])        
    
    end = time.time()
    print(f"Done! Time spent={end - start:.1f}s.")

python pandas_joblib.py --huge_dict=0
Running with huge_dict=0 on Windows 10 Intel64 Family 6 Model 45 Stepping 5, GenuineIntel (pandas: 1.3.5 joblib: 1.1.0 )
Done! Time spent=24.2s.

python pandas_joblib.py --huge_dict=1
Running with huge_dict=1 on Windows 10 Intel64 Family 6 Model 45 Stepping 5, GenuineIntel (pandas: 1.3.5 joblib: 1.1.0 )
Done! Time spent=106.1s.

Can someone explain why is this happening and how to avoid such degraded performance?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions