Outputting the result of multiprocessing to a pandas
dataframe¶
pandas
provides a high-performance, easy-to-use data structures and data analysis tools for Python programming. However, using pandas
with multiprocessing can be a challenge. In his stackoverflow post, Mike McKerns, nicely summarizes why this is so. He says:
You are asking multiprocessing (or other python parallel modules) to output to a data structure that they don't directly output to.¶
This tutorial demonstrates a straightforward workaround where you can return a list of lists from multiprocessing and then convert that to a pandas
data frame. While you're not getting a pandas
data frame straight from your threads, you still get a pandas
data frame at the end. Hooray!
Let's assume you're performing a compute intensive operation on multiple data frames. At the end of the operation, you want to merge the data frames.
Here's what the workflow might look like.
Parallel version¶
import time
import pandas as pd
import multiprocessing
from urllib.request import urlopen
url = "http://www.math.uah.edu/stat/data/Fisher.csv"
response = urlopen(url)
df = pd.read_csv(response)
results = []
def processData(df):
"""Does some compute intensive operation on the data frame.
Returns a list."""
for i in range(100000):
df = df * 1.0
return df.values.tolist()
def collect_results(result):
"""Uses apply_async's callback to setup up a separate Queue for each process"""
results.extend(result)
if __name__ == "__main__":
start_time = time.time()
# Repeats the compute intensive operation on 10 data frames concurrently
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
for i in range(10):
pool.apply_async(processData, args=(df, ), callback=collect_results)
pool.close()
pool.join()
# Converts list of lists to a data frame
df = pd.DataFrame(results)
print(df.shape)
print("--- %s seconds ---" % (time.time() - start_time))
The key parts of the parallel process above are df.values.tolist()
and callback=collect_results
. With df.values.tolist()
, we're converting the processed data frame to a list which is a data structure we can directly output from multiprocessing. With callback=collect_results
, we're using the multiprocessing's callback
functionality to setup up a separate queue for each process. This prevents deadlock or other synchronization problems on the shared resource, results
.
Here's what a serial version might look like.
Serial version¶
url = "http://www.math.uah.edu/stat/data/Fisher.csv"
response = urlopen(url)
df = pd.read_csv(response)
results = []
def processData(df):
"""Performs some compute intensive operation on the data frame.
Returns a list."""
for i in range(100000):
df = df * 1.0
return df
if __name__ == "__main__":
start_time = time.time()
# Repeats the compute intensive operation on 10 data frames serially
for i in range(10):
results.append(processData(df))
final = pd.concat(results, axis=0, ignore_index=True)
print(final.shape)
print("--- %s seconds ---" % (time.time() - start_time))
Using multiprocessing, we're able to reduce the processing time significantly.
You can imagine using this workflow if you're downloading fairly large datasets from an online source. Instead of waiting for each download to complete before moving to the next download, you can parallelize the process so that multiple downloads are happening simultaneously.
Your output from running this script will likely differ. Below is my system's information.
System info¶
!python --version
import platform
print(platform.platform())
print("cpu cores: {0}".format(multiprocessing.cpu_count()))
I couldn't get this notebook to run on Windows 10. It appears that Windows 10 has trouble running a jupyter notebook containing multiprocessing code. I had to spurn up an Amazon EC2 Linux instance (m4.2xlarge) and it ran the notebook without any problem. If you successfully ran the notebook on Windows 10, I like to hear what you did!