Pool's chunksize-algorithm is a heuristic. It provides a simple solution for all imaginable problem scenarios you are trying to stuff into Pool's methods. As a consequence, it cannot be optimized for any *specific* scenario.

The algorithm arbitrarily divides the iterable in approximately four times more chunks than the naive approach. More chunks mean more overhead, but increased scheduling flexibility. How this answer will show, this leads to a higher worker-utilization on average, but *without* the guarantee of a shorter overall computation time for every case.

"That's nice to know" you might think, "but how does knowing this help me with my concrete multiprocessing problems?" Well, it doesn't. The more honest short answer is, "there is no short answer", "multiprocessing is complex" and "it depends". An observed symptom can have different roots, even for similar scenarios.

This answer tries to provide you with basic concepts helping you to get a clearer picture of Pool's scheduling black box. It also tries to give you some basic tools at hand for recognizing and avoiding potential cliffs as far they are related to chunksize.

About this answerThis answer is work in progress.

To name a few things still missing:

- A summarizing section
- Measures for improved (short-) readability
If you don't

have toread it now, I recommend just skipping over it to get a sense for what to expect, but to postpone working through it, until these lines have been deleted.

Recent update (Feb. 21th):

- Outsourced chapter 7 into separate answer, because I got surprised with the 30000 characters limit
- Added two gifs in chapter 7, showing Pool's and the naive chunksize-algorithm in action

It is necessary to clarify some important terms first.

## 1. Definitions

**Chunk**

A chunk here is a share of the `iterable`

-argument specified in a pool-method call. How the chunksize gets calculated and what effects this can have, is the topic of this answer.

**Task**

A task's physical representation in a worker-process in terms of data can be seen in the figure below.

The figure shows an example call to `pool.map()`

, displayed along a line of code, taken from the `multiprocessing.pool.worker`

function, where a task read from the `inqueue`

gets unpacked. `worker`

is the underlying main-function in the `MainThread`

of a pool-worker-process. The `func`

-argument specified in the pool-method will only match the `func`

-variable inside the `worker`

-function for single-call methods like `apply_async`

and for `imap`

with `chunksize=1`

. For the rest of the pool-methods with a `chunksize`

-parameter the processing-function `func`

will be a mapper-function (mapstar or starmapstar). This function maps the user-specified `func`

-parameter on every element of the transmitted chunk of the iterable (--> "map-tasks"). The time this takes, defines a **task** also as a **unit of work**.

**Taskel**

While the usage of the word "task" for the *whole* processing of one chunk is matched by code within `multiprocessing.pool`

, there is no indication how a *single call* to the user-specified `func`

, with one
element of the chunk as argument(s), should be referred to. To avoid confusion emerging from naming conflicts (think of `maxtasksperchild`

-parameter within Pool's `__init__`

-method), this answer will refer to
the single units of work within a task as **taskel**.

A

taskel(fromtask + element) is the smallest unit of work within atask. It is the single execution of the function specified with the`func`

-parameter of a`Pool`

-method, called with arguments obtained froma single elementof the transmittedchunk. Ataskconsists of`chunksize`

taskels.

**Parallelization Overhead (PO)**

**PO** consists of Python-internal overhead and overhead for inter-process communication (IPC). The per-task overhead within Python comes with the code needed for packaging and unpacking the tasks and its results. IPC-overhead comes with the necessary synchronization of threads and the copying of data between different address spaces (two copy steps needed: parent -> queue -> child). The amount of IPC-overhead is OS-, hardware- and data-size dependent, what makes generalizations about the impact difficult.

## 2. Parallelization Goals

When using multiprocessing, our overall goal (obviously) is to minimize total processing time for all tasks. To reach this overall goal, our **technical goal** needs to be **optimizing the utilization of hardware resources**.

Some important sub-goals for achieving the technical goal are:

- minimize parallelization overhead (most famously, but not alone: IPC)
- high utilization across all cpu-cores
- keeping memory usage limited to prevent the OS from excessive paging (trashing)

At first, the tasks need to be computationally heavy (intensive) enough, to *earn back* the PO we have to pay for parallelization. The relevance of PO decreases with increasing absolute computation time per taskel. Or, to put it the other way around, the bigger the absolute computation time *per taskel* for your problem, the less relevant gets the need for reducing PO. If your computation will take hours per taskel, the IPC overhead will be negligible in comparison. The primary concern here is to prevent idling worker processes after all tasks have been distributed. Keeping all cores loaded means, we are parallelizing as much as possible.

## 3. Parallelization Scenarios

What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()

The major factor in question is how much computation time may *vary* across our single taskels. To name it, the choice for an optimal chunksize is determined by the...

Coefficient of Variation(CV) for computation times per taskel.

The two extreme scenarios on a scale, following from the extent of this variation are:

- All taskels need exactly the same computation time
- A taskel could take seconds or days to finish

For better memorability, I will refer to these scenarios as:

**Dense Scenario****Wide Scenario**

Another determining factor is the number of used worker-processes (backed by cpu-cores). It will become clear later why.

## Dense Scenario

In a **Dense Scenario** it would be desirable to distribute all taskels at once, to keep necessary IPC and context switching at a minimum. This means we want to create only as much chunks, as much worker processes there are. How already stated above, the weight of PO increases with smaller computation times per taskel.

For maximal throughput, we also want all worker processes busy until all tasks are processed (no idling workers). For this goal, the distributed chunks should be of equal size or close to.

## Wide Scenario

The prime example for a **Wide Scenario** would be an optimization problem, where results either converge quickly or computation can take hours, if not days. It's not predictable what mixture of "light taskels" and "heavy taskels" a task will contain in such a case, hence it's not advisable to distribute too many taskels in a task-batch at once. Distributing less taskels at once than possible, means increasing scheduling flexibility. This is needed here to reach our sub-goal of high utilization of all cores.

If `Pool`

methods, by default, would be totally optimized for the Dense Scenario, they would increasingly create suboptimal timings for every problem located closer to the Wide Scenario.

## 4. Risks of Chunksize > 1

Consider this simplified pseudo-code example of a **Wide Scenario**-iterable, which we want to pass into a pool-method:

```
good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]
```

Instead of the actual values, we pretend to see the needed computation time in seconds, for simplicity only 1 minute or 1 day.
We assume the pool has four worker processes (on four cores) and `chunksize`

is set to `2`

. Because the order will be kept, the chunks send to the workers will be these:

```
[(60, 60), (86400, 60), (86400, 60), (60, 84600)]
```

Since we have enough workers and the computation time is high enough, we can say, that every worker process will get a chunk to work on in the first place. (This does not have to be the case for fast completing tasks). Further we can say, the whole processing will take about 86400+60 seconds, because that's the highest total computation time for a chunk in this artificial scenario and we distribute chunks only once.

Now consider this iterable, which only has one position switched compared to the one before:

```
bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]
```

...and the corresponding chunks:

```
[(60, 60), (86400, 86400), (60, 60), (60, 84600)]
```

Just bad luck with the sorting of our iterable nearly doubled (86400+86400) our total processing time! The worker getting the vicious (86400, 86400)-chunk is blocking the second heavy taskel in its task from getting distributed to one of the idling workers already finished with their (60, 60)-chunks. We obviously would not risk such an unpleasant outcome if we set `chunksize=1`

.

This is the risk of bigger chunksizes. With higher chunksizes we trade scheduling flexibility for less overhead and in cases like above, that's a bad deal.

How we will see in chapter **6. Quantifying Algorithm Efficiency**, bigger chunksizes can also lead to suboptimal results for **Dense Scenarios**.

## 5. Pool's Chunksize Algorithm

Below you will find a slightly modified version of the algorithm inside the source code. As you can see, I cut off the lower part and wrapped it into a function for calculating the `chunksize`

argument externally. I also replaced `4`

with a `factor`

parameter and outsourced the `len()`

calls.

```
# mp_utils.py def calc_chunksize(n_workers, len_iterable, factor=4): """Calculate chunksize argument for Pool-methods. Resembles source-code within `multiprocessing.pool.Pool._map_async`. """ chunksize, extra = divmod(len_iterable, n_workers * factor) if extra: chunksize += 1 return chunksize
```

To ensure we are all on the same page, here's what `divmod`

does:

`divmod(x, y)`

is a builtin function which returns `(x//y, x%y)`

.
`x // y`

is the floor division, returning the down rounded quotient from `x / y`

, while
`x % y`

is the modulo operation returning the remainder from `x / y`

.
Hence e.g. `divmod(10, 3)`

returns `(3, 1)`

.

Now when you look at `chunksize, extra = divmod(len_iterable, n_workers * 4)`

, you will notice `n_workers`

here is the divisor `y`

in `x / y`

and multiplication by `4`

, without further adjustment through `if extra: chunksize +=1`

later on, leads to an initial chunksize *at least* four times smaller (for `len_iterable >= n_workers * 4`

) than it would be otherwise.

For viewing the effect of multiplication by 4 on the intermediate chunksize result consider this function:

```
def compare_chunksizes(len_iterable, n_workers=4): """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize for Pool's complete algorithm. Return chunksizes and the real factors by which naive chunksizes are bigger. """ cs_naive = len_iterable // n_workers or 1 # naive approach cs_pool1 = len_iterable // (n_workers * 4) or 1 # incomplete pool algo. cs_pool2 = calc_chunksize(n_workers, len_iterable) real_factor_pool1 = cs_naive / cs_pool1 real_factor_pool2 = cs_naive / cs_pool2 return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2
```

The function above calculates the naive chunksize (`cs_naive`

) and the first-step chunksize of Pool's chunksize-algorithm (`cs_pool1`

), as well as the chunksize for the complete Pool-algorithm (`cs_pool2`

). Further it calculates the **real factors**`rf_pool1 = cs_naive / cs_pool1`

and `rf_pool2 = cs_naive / cs_pool2`

, which tell us how many times the naively calculated chunksizes are bigger than Pool's internal version(s).

Below you see two figures created with output from this function. The left figure just shows the chunksizes for `n_workers=4`

up until an iterable length of `500`

. The right figure shows the values for `rf_pool1`

. For iterable length `16`

, the real factor becomes `>=4`

(for `len_iterable >= n_workers * 4`

) and it's maximum value is `7`

for iterable lengths `28-31`

. That's a massive deviation from the original factor `4`

the algorithm converges to for longer iterables. 'Longer' here is relative and depends on the number of specified workers.

Remember chunksize `cs_pool1`

still lacks the `extra`

-adjustment with the remainder from `divmod`

contained in `cs_pool2`

from the complete algorithm.

The algorithm goes on with:

```
if extra: chunksize += 1
```

Now in cases were there *is* a remainder (an `extra`

from the divmod-operation), increasing the chunksize by 1 obviously cannot work out for every task. After all, if it would, there would not be a remainder to begin with.

How you can see in the figures below, the "**extra-treatment**" has the effect, that the **real factor** for `rf_pool2`

now converges towards `4`

from *below* `4`

and the deviation is somewhat smoother. Standard deviation for `n_workers=4`

and `len_iterable=500`

drops from `0.5233`

for `rf_pool1`

to `0.4115`

for `rf_pool2`

.

Eventually, increasing `chunksize`

by 1 has the effect, that the last task transmitted only has a size of `len_iterable % chunksize or chunksize`

.

The more interesting and how we will see later, more consequential, effect of the **extra-treatment** however can be observed for the **number of generated chunks** (`n_chunks`

).
For long enough iterables, Pool's completed chunksize-algorithm (`n_pool2`

in the figure below) will stabilize the number of chunks at `n_chunks == n_workers * 4`

.
In contrast, the naive algorithm (after an initial burp) keeps alternating between `n_chunks == n_workers`

and `n_chunks == n_workers + 1`

as the length of the iterable grows.

Below you will find two enhanced info-functions for Pool's and the naive chunksize-algorithm. The output of this functions will be needed in the next chapter.

```
# mp_utils.py from collections import namedtuple Chunkinfo = namedtuple( 'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks', 'chunksize', 'last_chunk']
) def calc_chunksize_info(n_workers, len_iterable, factor=4): """Calculate chunksize numbers.""" chunksize, extra = divmod(len_iterable, n_workers * factor) if extra: chunksize += 1 # `+ (len_iterable % chunksize > 0)` exploits that `True == 1` n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0) # exploit `0 == False` last_chunk = len_iterable % chunksize or chunksize return Chunkinfo( n_workers, len_iterable, n_chunks, chunksize, last_chunk )
```

Don't be confused by the probably unexpected look of `calc_naive_chunksize_info`

. The `extra`

from `divmod`

is not used for calculating the chunksize.

```
def calc_naive_chunksize_info(n_workers, len_iterable): """Calculate naive chunksize numbers.""" chunksize, extra = divmod(len_iterable, n_workers) if chunksize == 0: chunksize = 1 n_chunks = extra last_chunk = chunksize else: n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0) last_chunk = len_iterable % chunksize or chunksize return Chunkinfo( n_workers, len_iterable, n_chunks, chunksize, last_chunk )
```

## 6. Quantifying Algorithm Efficiency

Now, after we have seen how the output of `Pool`

's chunksize-algorithm looks different compared to output from the naive algorithm...

**How to tell if Pool's approach actually***improves*something?**And what exactly could this***something*be?

As shown in the previous chapter, for longer iterables (a bigger number of taskels), Pool's chunksize-algorithm *approximately* divides the iterable into four times *more* chunks than the naive method. Smaller chunks mean more tasks and more tasks mean more Parallelization Overhead (PO), a cost which must be weighed against the benefit of increased scheduling-flexibility (recall **"Risks of Chunksize>1"**).

For rather obvious reasons, Pool's basic chunksize-algorithm cannot weigh scheduling-flexibility against PO for you. IPC-overhead is OS-, hardware- and data-size dependent. The algorithm cannot know on what hardware we run our code, nor does it have a clue how long a taskel will take to finish. It's a heuristic providing basic functionality for *all* possible scenarios. This means it cannot be optimized for any scenario in particular. As mentioned before, PO also becomes increasingly less of a concern with increasing computation times per taskel (negative correlation).

When you recall the **Parallelization Goals** from chapter 2, one bullet-point was:

- high utilization across all cpu-cores

A repeating question on SO regarding `multiprocessing.Pool`

is asked by people wondering about unused cores / idling worker-processes in situations where you would expect all worker-processes busy.

Idling worker-processes towards the end of our computation is an observation we can make even with **Dense Scenarios** (totally equal computation times per taskel) in cases where the number of workers is not a **divisor** of the number of chunks (`n_chunks % n_workers > 0`

).

A bigger number of chunks means an increased chance, that the number of workers will be a divisor for `n_chunks`

, hence the chance of not observing idling workers improves accordingly.

For reasons mentioned before, the PO aspect completely stays out of scope from theoretical considerations about measuring algorithm efficiency, at least in an initial step. The previously mentioned *something*, Pool's chunksize-algorithm *can* try to improve is the **minimization of idling worker-processes**, respectively the **utilization of cpu-cores**.

The value quantifying the rate of worker-utilization, I'm going to refer to as:

Parallelization Efficiency (PE)

Our original condition for a formalization of the problem, the stable state, is a **Dense Scenario** with totally equal computation times per taskel. Every other scenario would be random / chaos and not suited for a ceteris paribus investigation. Further chaos factors like OS thread-scheduling policy are also not taken into consideration.

It's important to note, that **PE**, in the sense I'm using the term here, does ** not** automatically correlate with

*faster*overall computation for a given parallelization problem.

**PE**

*doesn't*tell us if the workers are

*productively*busy, or if most of the time is wasted in handling overhead. It only tells us the percentage of worker utilization in the meaning of an

*absence of idling workers*- and it does so

*only*for our simplified model.

## 6.1 Absolute Parallelization Efficiency (APE)

While thinking about how I would actually be able to *quantify* a possible advantage of Pool's chunksize-algorithm over the naive chunksize-algorithm, I imagined a picture of Pool's worker-scheduling like you see it below.

- The x-axis is sectioned into equal units of time, where each unit stands for the computation time a taskel requires.
- The y-axis is divided into the number of worker-processes the pool uses.
- A taskel here is displayed as the smallest cyan-colored rectangle, put into a timeline (a schedule) of an anonymized worker-process.
- A task is one or multiple taskels in a worker-timeline continuously highlighted with the same hue.
- Idling time units are represented through red colored tiles.
- The Parallel Schedule is partitioned into sections. The last section is the tail-section.

The names for the composed parts can be seen in the picture below.

Parallelization Efficiency then gets calculated by dividing the **Busy Share** through the whole potential:

Absolute Parallelization Efficiency (APE)=Busy Share/Parallel Schedule

Here is how this looks in code:

```
# mp_utils.py def calc_abs_pe(n_workers, len_iterable, n_chunks, chunksize, last_chunk): """Calculate absolute parallelization efficiency. `len_iterable` is not used, but contained to keep a consistent signature with `calc_rel_pe`. """ potential = ( ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize) + (n_chunks % n_workers == 1) * last_chunk ) * n_workers n_full_chunks = n_chunks - (chunksize > last_chunk) taskels_in_regular_chunks = n_full_chunks * chunksize real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk abs_pe = real / potential return abs_pe
```

If there is no **Idling Share**, **Busy Share** will be *equal* to **Parallel Schedule**, hence we get an **APE** of 100%. In our simplified model, this is a scenario where all available processes will be busy through the whole time needed for processing all tasks. In other words, the whole job gets effectively parallelized to 100 percent.

But why do I keep referring to **PE** as *absolute* **PE** here?

To comprehend that, we have to consider a possible case for the chunksize (cs) which ensures maximal scheduling flexibility (also, the number of Highlanders there can be. Coincidence?):

___________________________________

~ ONE ~___________________________________

If we, for example, have four worker-processes and 37 taskels, there will be idling workers even with `chunksize=1`

, just because `n_workers=4`

is not a divisor of 37. The remainder of dividing 37 / 4 is 1. This single remaining taskel will have to be processed by a sole worker, while the remaining three are idling.

Likewise, there will still be one idling worker with 39 taskels, how you can see pictured below.

When you compare the upper **Parallel Schedule** for `chunksize=1`

with the below version for `chunksize=3`

, you will notice that the upper **Parallel Schedule** is smaller, the timeline on the x-axis shorter. It should become obvious now, how bigger chunksizes unexpectedly also *can* lead to increased overall computation times, even for **Dense Scenarios**.

But why not just use the length of the x-axis for efficiency calculations?

Because the overhead is not contained in this model. It will be different for both chunksizes, hence the x-axis is not really directly comparable. The overhead can still lead to a longer total computation time like shown in **case 2** from the figure below.

## 6.2 Relative Parallelization Efficiency (RPE)

The **APE** value does not contain the information if a *better* distribution of taskels is possible with chunksize set to 1. *Better* here still means a smaller **Idling Share**.

To get a **PE** value adjusted for the maximum possible **PE**, we have to divide the considered **APE** through the **APE** we get with `chunksize=1`

.

Relative Parallelization Efficiency (RPE)=APE_cs_x/APE_cs_1

Here is how this looks in code:

```
# mp_utils.py def calc_rel_pe(n_workers, len_iterable, n_chunks, chunksize, last_chunk): """Calculate relative parallelization efficiency.""" abs_pe_cs1 = calc_abs_pe( n_workers, len_iterable, n_chunks=len_iterable, chunksize=1, last_chunk=1 ) abs_pe = calc_abs_pe( n_workers, len_iterable, n_chunks, chunksize, last_chunk ) rel_pe = abs_pe / abs_pe_cs1 return rel_pe
```

**RPE**, how defined here, in essence is a tale about the tail of a **Parallel Schedule**. **RPE** is influenced by the maximum effective chunksize contained in the tail. (This tail can be of x-axis length `chunksize`

or `last_chunk`

.)
This has the consequence, that **RPE** naturally converges to 100% (even) for all sorts of "tail-looks" like shown in the figure below.

A low

RPE...

- is a strong hint for optimization potential.
- naturally gets less likely for longer iterables, because the relative tail-portion of the overall
Parallel Scheduleshrinks.

find Part II of this answer here below.