September 6th, 2021
One of the latest tasks at GoodIP was to calculate the similarities between around 480k items having around 800 observations in the range of 0–50k each. Reducing the dimensionality would compromise the quality of the long-tail results, which is undesirable. The following article evaluates the performance of different implementations, describes how to split the dataset into multiple chunks and process them in parallel. I also want to mention DeepGraph, which provides an interesting, different approach.
As a short refresher, the Pearson correlation assigns a value in the range of −1, signifying a negative correlation (if the value in A increases, the corresponding value in B decreases) and 1, a positive correlation (if A increases B also increases). A value of 0 means no correlation. It is defined as

where x̄, and ȳ are the means of values in x and y.
In our case, the CSV file containing the dataset is around 1GB in size. We are going to load it and create a NumPy ndarray from it.

Simply trying to run np.corrcoef(numpy_items) raises the exception MemoryError: Unable to allocate 1.6 TiB for an array with shape (480000, 480000) and data type float64 — which illustrates the dimension of the problem. The actual number of pairings that actually need to be processed is the number of different, unordered combinations — choosing r objects from a set of n objects. nCr(n,r)=n!/r!(n−r)! In this case nCr(480000,2), but still 115 199 760 000 combinations.
Implementations performance comparison
Let’s first compare the performance of different implementations from Pandas, NumPy, and CuPy and find out which works with parallelization. Therefore we only use a 1% subset of the data.
GPUs perform great with matrix operations, which could be the cause why CuPy outperforms the other implementations by far. This approach could also outperform our parallelized CPU approach and will be evaluated in a follow-up article. In the following, however, we use the custom implementation that utilizes NumPy, precomputed means, and can easily be distributed to different processes.
Chunking the dataset
As we do not want to precompute all pairings, mainly due to memory constraints, the approach is to take one row, compute the correlation with all other rows below that index, increment the index, etc. This way, we exclude reflexive comparisons and use the commutativity of the function by ignoring the order. The computational effort for the first indices is then way higher than for the last, and we only want to split at full index positions, both taken into consideration with the following naive split. We need the nCr function for the number of total combinations.
Since Python 3.8 you can also use math.comb. The following code finds the indices that split the index into n_chunks close to equal sized parts.
We can check the number of pairings in each chunk running:
Processing the dataset
At first, we precompute the means of the dataset. As the data is read-only, we can just define it in the global scope. All the child processes will then be able to access it, and it will not be copied — provided you don’t write to it.
The initial idea was to send the results from the workers to a dedicated storage worker through a queue. Unfortunately, this decreased the performance significantly due to blocking and serialization. The fastest option to implement was to write the results to disk directly from the worker.

With our dataset, this creates 480k CSV files with a total size of approx. 4TB. We batched and gzip compressed the results to ~ 4GB parts and loaded them into Google BigQuery for production use.
Want to keep updated on our latest tech info? Click here to subscribe to our content newsletter!