Simple, Fast, Easy Parallelism in Shell Pipelines


The typical shell1 pipeline looks something like this:

Usually src will output lines of data, and worker acts as a filter, processing each line and sending some transformed lines to sink. So there is a possibility of parallel processing on the basis of lines: We could have multiple worker tasks2 which each process different lines of data from src and output transformed lines to sink.3

Of course, there already exists xargs and GNU parallel4, which have the ability to run multiple tasks simultaneously. What's the difference?

  • When xargs runs a command, it provides a line of input as an argument to that command. But the typical data-processing Unix command reads lines of input from stdin, not from its arguments on the command line, so many commands simply don't make sense to use with xargs.
  • xargs starts a new task2 for each line of input. That is impractical for programs with a slow initialization. For example, this is painful for programs that open a network connection.
  • Because a new task is started for every line, workers can't keep state about the data they have seen. For example, uniq cannot be done in parallel with xargs. This ability to keep state as input is processed is useful for many, many data-processing tasks.

Overall, xargs is mainly useful for executing a command repeatedly with a varying set of arguments, not processing a lot of data. A technique that allows a pool of worker tasks2, executing in parallel, to process incoming lines as they arrive on stdin, would be strictly more general. You could even run xargs within such a technique, or nest such a technique within itself; you can't run xargs from xargs.

Writing a parallel pipeline in any shell looks like this:3

src | { worker & worker & worker & } | sink

This will start up three workers in the background, which will all read data from src in parallel, and write output to sink. The output of src is not copied to all the workers; each byte of output becomes input to exactly one worker.5

Since all the worker tasks are reading input at the same time, we might run into a common concurrency issue: one worker task might get the first part of a line, while the rest of the line goes to another worker task. This is called "interleaving", and if we allowed it to happen it would cause the input to the workers to be completely corrupt.

Here's an example of using this parallel processing technique without protecting against interleaving. Note that in bash, we need to place <&0 in front of the first background command. This just means "this command should read input from file descriptor 0; that is, stdin", which happens by default in most shells, but won't happen in bash due to a bug that will be fixed in the next release.

yes ooo | head -n 8 | pv --quiet --rate-limit 4 | { <&0 tr o z & tr o x & }
x
x
zzzzzz
zzz
zzz
zzz
x
zzz
xxx

Pretty severe interleaving! We used pv to throttle the input going to the workers to increase the amount of interleaving that would happen.

To deal with the issue of interleaving, we need to introduce two new commands. pad will pad incoming lines of data to a fixed size, and unpad removes that padding. Then we make the following small modification:

src | pad | { unpad | worker & unpad | worker & unpad | worker & } | sink

Communicating with fixed-size blocks means that pad and unpad will never interleave, due to pipe atomicity. This fixed size must be determined in advance, and should be at least as large as the largest possible line that src can emit. There is an upper limit6 on this fixed size; on Linux, it's 4096 bytes; and if a line is longer than that, it will be truncated to 4096 bytes, discarding all extra data. If your lines are longer than 4096 bytes, you will lose data! But most people don't work with lines that are anywhere close to 4096 bytes long, so they have little to worry about if they are careful. If you do work with such long lines then you can work around the limit in any number of ways.7

pad and unpad can be defined as follows:8, 6

# pad to the maximize size we can do and still be atomic on this system
pipe_buf=$(getconf PIPE_BUF /)
function pad() { # redirect stderr (file descriptor 2) to /dev/null to get rid of noise dd conv=block cbs=$pipe_buf obs=$pipe_buf 2>/dev/null
}
function unpad() { dd conv=unblock cbs=$pipe_buf ibs=$pipe_buf 2>/dev/null
}

For convenience, you could insert this snippet into your .bashrc.

Let's look at a fixed version:

yes ooo | head -n 8 | pv --quiet --rate-limit 4 | pad | { <&0 unpad | tr o z & unpad | tr o x & }
zzz
zzz
zzz
xxx
xxx
xxx
xxx
xxx

Great.

Now let's use these capabilities productively. To factor numbers!

# define some giant constants
seqs=5000000000000000000
seqe=18000000000000000000
shufn=100000
# generate lines containing $shufn numbers randomly selected from $seqs to $seqe
shuf --input-range $seqs-$seqe -n $shufn | pad | { # pad and unpad them, and factor each number <&0 unpad | factor | pad & unpad | factor | pad & unpad | factor | pad & unpad | factor | pad &
} | unpad

Since factor is CPU-bound, we want to run it on multiple CPU cores at once. Since these worker tasks will be able to run simultaneously on different cores, this will be substantially faster than the single-worker case.9 We pad and unpad the output as well to avoid it being interleaved, since factor could perform partial writes or writes over the maximum atomic size. Placing pad and unpad on the output is safer unless you know for sure that the output is atomic.10

Try comparing different numbers of workers to the single-worker case:

# define some giant constants
seqs=5000000000000000000
seqe=18000000000000000000
shufn=100000
shuf --input-range $seqs-$seqe -n $shufn | factor

Now let's look at a more sophisticated example. I want to send a bunch of HTTPS requests to some server, perhaps for scraping or testing. Python, the usual tool for something like this, is just too slow for my purposes; so I want to use something fast, like a shell script.11 I can just do the following:

# specify the server we're connecting to
host="api.example.com"
# We will produce our HTTP requests with printf; we will perform a
# printf "$format" somenumber
# for each input number, which outputs a complete HTTP request to send off
format="GET /api/heartbeat/%d HTTP/1.1
Host: $host "
function worker() { # unpad the input, pass each line to xargs for printf-formatting,  # and pass the resulting request to s_client # use pv to throttle our requests to 8 per second per worker. unpad | pv --quiet --line-mode --rate-limit 8 | \ xargs -n 1 printf "$format" | openssl s_client $host | pad
}
# generate an endless stream of increasing integers and pad them to a fixed size
seq inf | pad | {
<&0 worker & worker & worker & worker &
} | unpad

And that's all there is to it! openssl s_client establishes a TLS connection to the provided host, then sends stdin to the host and copies the host's replies to stdout. So this will endlessly send heartbeat GET requests to api.example.com, in parallel over 4 TCP connections, and we'll get the results on stdout.

One small caveat, which is unimportant for most usage. Note that a pipe doesn't directly connect one process to another. The kernel maintains a buffer for the pipe, called the pipe buffer. Writes go into the pipe buffer and reads come out of the pipe buffer. And since each process reads and writes data as quickly as possible, and unpad can read and write very quickly indeed, unpad might outpace the later parts of the worker pipeline. In that case, some lines of input would sit idle in the pipe buffer between unpad and the rest of the pipeline.

This won't normally be a problem, but if you've exhausted all the input, then you might have one or several workers with full pipe buffers, while other workers don't have any more input to process. Thus at the end of the input, there might be less parallel processing going on than is possible. Again, only a small issue, but I thought it was best to mention it. A quick hack around this is to throttle (with pv) right after unpad in the worker pipeline, which limits the amount of pipe buffers that could be filled to just one; alternatively, throttle before pad when generating the input. Ideally, dd would have a throttling option built-in, which would allow wholly eliminating the problem… I'm working on a patch.

Again one last reminder: If your lines are over 4096 bytes long, you will lose data if you don't use a workaround.7 And pad and unpad can only be omitted from the worker output if you know the worker output is atomic.10 Now, with this caveats, go forth and use this knowledge to construct high-performance concurrent systems out of shell scripts!

But wait, what about GNU parallel? Actually, it turns out that GNU parallel does give us this capability. It supports sending lines of input to commands over stdin with its --pipe argument and sending multiple lines to the same task with its --round-robin argument. And it doesn't have a 4096-byte-long line limit, and you don't need to worry about wrapping your workers with pad and unpad. So, you should probably just use GNU parallel. :)