Outputting the result of multiprocessing to a pandas dataframe

Outputting the result of multiprocessing to a pandas dataframe

pandas provides a high-performance, easy-to-use data structures and data analysis tools for Python programming. However, using pandas with multiprocessing can be a challenge. In his stackoverflow post, Mike McKerns, nicely summarizes why this is so. He says:

You are asking multiprocessing (or other python parallel modules) to output to a data structure that they don't directly output to.

This tutorial demonstrates a straightforward workaround where you can return a list of lists from multiprocessing and then convert that to a pandas data frame. While you're not getting a pandas data frame straight from your threads, you still get a pandas data frame at the end. Hooray!

Let's assume you're performing a compute intensive operation on multiple data frames. At the end of the operation, you want to merge the data frames.

Here's what the workflow might look like.

Parallel version

In [1]:
import time
import pandas as pd
import multiprocessing
from urllib.request import urlopen

url = "http://www.math.uah.edu/stat/data/Fisher.csv"
response = urlopen(url)
df = pd.read_csv(response)
results = []

def processData(df):  
    """Does some compute intensive operation on the data frame.
       Returns a list."""
       
    for i in range(100000):
        df = df * 1.0
    return df.values.tolist()

def collect_results(result):
    """Uses apply_async's callback to setup up a separate Queue for each process"""
    results.extend(result)
    
if __name__ == "__main__":
    start_time = time.time()  
    
    # Repeats the compute intensive operation on 10 data frames concurrently
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    for i in range(10): 
        pool.apply_async(processData, args=(df, ), callback=collect_results)
    pool.close()
    pool.join()
    
    # Converts list of lists to a data frame
    df = pd.DataFrame(results)
    print(df.shape)
    print("--- %s seconds ---" % (time.time() - start_time))
(1500, 5)
--- 79.52008247375488 seconds ---

The key parts of the parallel process above are df.values.tolist() and callback=collect_results. With df.values.tolist(), we're converting the processed data frame to a list which is a data structure we can directly output from multiprocessing. With callback=collect_results, we're using the multiprocessing's callback functionality to setup up a separate queue for each process. This prevents deadlock or other synchronization problems on the shared resource, results.

Here's what a serial version might look like.

Serial version

In [2]:
url = "http://www.math.uah.edu/stat/data/Fisher.csv"
response = urlopen(url)
df = pd.read_csv(response)
    
results = []


def processData(df):  
    """Performs some compute intensive operation on the data frame.
       Returns a list."""
       
    for i in range(100000):
        df = df * 1.0
    return df
    
if __name__ == "__main__":
    start_time = time.time()
    
    # Repeats the compute intensive operation on 10 data frames serially
    for i in range(10):
        results.append(processData(df))
    final = pd.concat(results, axis=0, ignore_index=True)    

    print(final.shape)
    print("--- %s seconds ---" % (time.time() - start_time))
(1500, 5)
--- 134.60705041885376 seconds ---

Using multiprocessing, we're able to reduce the processing time significantly.

You can imagine using this workflow if you're downloading fairly large datasets from an online source. Instead of waiting for each download to complete before moving to the next download, you can parallelize the process so that multiple downloads are happening simultaneously.

Your output from running this script will likely differ. Below is my system's information.

System info

In [3]:
!python --version
import platform
print(platform.platform())
print("cpu cores: {0}".format(multiprocessing.cpu_count()))
Python 3.5.2 :: Anaconda 4.1.1 (64-bit)
Linux-3.13.0-74-generic-x86_64-with-debian-jessie-sid
cpu cores: 8

I couldn't get this notebook to run on Windows 10. It appears that Windows 10 has trouble running a jupyter notebook containing multiprocessing code. I had to spurn up an Amazon EC2 Linux instance (m4.2xlarge) and it ran the notebook without any problem. If you successfully ran the notebook on Windows 10, I like to hear what you did!