Multiprocessing.Pool() - A Global Solution

Intro

In this post, we talk about how to copy data from a parent process, to several worker processes in a multiprocessing.Pool using global variables. Specifically, we will use class attributes, as I find this solution to be slightly more appealing then using global variables defined at the top of a file.

For those of you just joining this series, the problem we are trying to solve is follows…

Given the following class:

class IntToBitarrayConverter():
    
    def set_bitstring_cache(self, bitstring_cache: Dict):
        self.bitstring_cache = bitstring_cache

    def convert(self, integer: int) -> np.ndarray:
        bitstring = self.bitstring_cache[integer]
        # cache the step of bitstring = format(integer, 'b')
        return self._bitstring_to_ndarray(bitstring)

    @staticmethod
    def _bitstring_to_ndarray(bitstring) -> np.ndarray:
        arr = (np.fromstring(bitstring, 'u1') - 48)
        return arr

And given the following parallelization of our convert() method using Pool:

CACHE_SIZE = 1024 * 1024  # 1 MB
ITER_SIZE = 1000000  # 1 million

int_to_bitarr_converter = IntToBitarrayConverter()
int_to_bitarr_converter.set_bitstring_cache(
    {key: format(key, 'b') for key in range(CACHE_SIZE)})
with Pool() as pool:
    ndarray_bitarr_ls = pool.map(
        int_to_bitarr_converter.convert,
        (random.randint(0, CACHE_SIZE - 1)
         for _ in range(ITER_SIZE))))

We learned that when you have an instance method with a large object bound to it, passing this method to Pool.map(...) results in a huge performance loss due to repeated serializing/deserializing of the large object between processes.

This is what happened with our convert() method above. In this post, we explore a solution that shows us “how to pass data to Pool of workers, and only do it once.”

A Classy Solution

The unfortunate answer to our question above is that, given the current implementation of Pool, we need to use global variables to pass data to our workers. Most of the interweb resources will show you this same example.

By taking a different approach, we can class this up a bit by using class attributes and a @classmethod, with the hope that we can delicately preserve some semblance of encapsulation in our code.

Enter the ClassMethodBitarrayConverter:

class ClassMethodBitarrayConverter(IntToBitarrayConverter):
    bitstring_cache = None

    @classmethod
    def set_bitstring_cache(cls, bitstring_cache: Dict):
        cls.bitstring_cache = bitstring_cache

    @classmethod
    def convert(cls, integer: int, init_return=None) -> np.ndarray:
        bitstring = cls.bitstring_cache[integer]
        return cls._bitstring_to_ndarray(bitstring)

We inherit from IntToBitarrayConverter, and make three changes:

  1. bitstring_cache becomes a class attribute
  2. convert() becomes an @classmethod
  3. set_bitstring_cache() becomes an @classmethod

Note: What we’ve done here is essentially bundle 2 global functions, with 1 global variable, and co-locate them within the same class. This is quietly approaching the singleton anti-pattern, but I prefer the encapsulation here over globals, despite it being a dangerous facade…

Now, when we run the parallelized code above, because ClassMethodBitarrayConverter.convert() and ClassMethodBitarrayConverter.bitstring_cache are glorified globals, convert() has access to bitstring_cache, without any serializing/deserializing required, within each worker Process.

Compared to the 32.5s in our previous implementation, our classy global solution runs in just 5s!

Note: If you’ve been following along, you’ll notice that this is the same performance as our sequential, non-parallelized code. This is because the convert() method is not CPU intensive. This is a topic for another post!

We see drastic improvements in our profiled calls to loads() and dumps():

Process dumps() calls dumps() time(s) avg loads() calls loads() time(s) avg
Parent 42 4.09s .09s 33 7.36s .13s
1 4 10.34s 2.57s 5 .04s .008s
2 4 10.31s 2.57s 5 .04s .008s
3 4 10.31s 2.57s 5 .04s .008s
4 4 10.36s 2.58s 5 .04s .008s
5 4 10.23s 2.55s 5 .04s .008s
6 4 10.19s 2.54s 5 .04s .008s
7 4 10.04s 2.52s 5 .04s .008s
8 4 9.88s 2.47s 5 .04s .008s

Global Problems

Our new implementation above is encapsulated within a class. This is more readable due to its modularity. However, to cut through the BS, at its core ClassMethodBitarrayConverter is nothing but a global.

This means we expose ourselves to the following:

  1. Singleton Pattern: We can’t really have more than one unique instance of our class - we only have one attribute, and it is a class attribute. Two instances of our class would be equal, and would share the same exact state. Our class is really a singleton
  2. Testability Problems: Testing our class is extremely error prone: if one test changes the state of the bitstring_cache during a test, that test must undo the global state on teardown so it does not persist to the next test!
  3. Code Maintainability: To understand a small module of the codebase, you need to first digest every part of the codebase that alters your global state, rather than just the public interface of your module.

The Real Problem

The real problem is not that we are using globals - the question of “whether globals are bad” is an opinion, see this SO post.

The problem is that we are forced to use globals becaue of the multiprocessing.Pool() implementation, and not because we want to share our globals across our app!

I’ve begun work here on augmenting the current initializer kwarg to Pool(...), which will allow for more flexibility when initializing worker processes. Specifically, the feature will allow you to pass non-picklable objects, or very large objects to each worker process, by returning a value from the initializer function, and passing the return value to whatever function you are applying in map().