Chapter 8 Parallel Pipelines

In the previous chapters, we have been dealing with commands and pipelines that take care of an entire task at once. In practice, however, you may find yourself facing a task which requires the same command or pipeline to run multiple times. For, example, you may need to:

  • Scrape hundreds of web pages.
  • Make dozens of API calls and transform their output.
  • Train a classifier for a range of parameter values.
  • Generate scatter plots for every pair of features in your dataset.

In any of the above examples, there is a certain form of repetition involved. With your favorite scripting or programming language, you take care of this with a for loop or a while loop. On the command line, the first thing you might be inclined to do is to press <Up> (which brings back the previous command), modify it if necessary, and press <Enter> (which runs the command again). This is fine for two or three times, but imagine doing this for, say, dozens of files. Such an approach quickly becomes cumbersome and time-inefficient. The good news is that we can write such loops on the command line as well. This chapter is all about repetition.

Sometimes, repeating a fast command on after the other (in serial) is sufficient. When you have multiple cores (and perhaps even multiple machines) it would be nice if you could make use of those, especially when you’re faced with a data-intensive task. When using multiple cores or machines, the total running time of may be reduced significantly. In this chapter we will introduce a very powerful tool called GNU Parallel that can take care of exactly this. GNU Parallel allows us to apply a command or pipeline with a range of arguments such as numbers, lines, and files. Plus, it allows us to run our commands in parallel.

8.1 Overview

This intermezzo chapter discusses several approaches to speed up tasks that require commands and pipelines to be run many times. The main goal of this chapter is to demonstrate to you the flexibility and power of a tool called GNU Parallel. Because this tool can be combined with any other tool discussed in this book, it will positively change the way you use the command line for data science. In this chapter, you’ll learn about:

  • Running commands in serial to a range of numbers, lines, and files.
  • Breaking a large task into several smaller tasks.
  • Running pipelines in parallel using GNU Parallel.
  • Distributing pipelines on multiple machines.

8.2 Serial Processing

Before we dive into parallelization, we will look at looping in a serial fashion. It’s worthwhile to know how to do this because this functionality is always available, the syntax closely resembles looping in other programming languages, and it will really make you appreciate the tool GNU Parallel.

From the examples provided in the introduction of this chapter, we can distill three types of items to loop over: (1) numbers, (2) lines, and (3) files. These three types of items will be discussed in the next three subsections, respectively.

8.2.1 Looping Over Numbers

Imagine that we need to compute the square of every even integer between 0 and 100. There’s a tool called bc, which is basically a calculator on the command line where you can pipe an equation to. The command to compute the square of 4 looks as follows:

For a one-off calculation, this is perfect. However, as mentioned in the introduction, we would be creazy to press <Up>, change the number, and press <Enter> 51 times! In this case it is better to let Bash do the hard work for us by using a for loop:

There are a number of things going on here:

  • Bash has a feature called brace expansion, which transforms {0..100..2} into a list separated by spaces: 0 2 4 … 98 100.
  • The variable i is assigned the value 1 in the first iteration, 2 in the second iteration, and so forth. The value of this variable can be employed in commands by prefixing it with a dollar sign $. The shell will replace $i with its value before echo is being executed. Note that there can be more than one command between do and done.
  • We pipe the output of the for loop to tail so that we see the last ten values, only.

Although the syntax may appear a bit odd compared to your favorite programming language, it is worth remembering this because it is always available in the bash shell. We will shortly introduce a better and more flexible way of repeating commands.

8.2.2 Looping Over Lines

The second type of items we can loop over are lines. These lines can come from either a file or from standard input. This is a very generic approach because the lines can contain anything, including: numbers, dates, and email adresses.

Imagine that we want to send an email to our customers. Let’s generate some fake users using the http://randomuser.me/ API:

We can loop over the lines from emails.txt with a while-loop:

  • In this case we need to use a while loop because Bash does not know beforehand how many lines the input consists of.
  • Although the curly braces around the line variable are not necessary in this case (since variable names cannot contain periods), it’s still good practice.
  • This redirection can also be placed before while.

You can also provide input to the while loop interactively by specifying the special file standard input /dev/stdin. Press <Ctrl-D> when you are done.

This method, however, has the disadvantage that, once you press <Enter>, the command(s) between do and done are run immediately for that line of input.

8.2.3 Looping Over Files

In this section we discuss the third type of item that we often need to loop over: files.

To handle special characters, use globbing (i.e., pathname expansion) instead of ls:

Just as with brace expansion with numbers, the *.csv is first expanded into a list before it is being processed by the for loop.

A more elaborate alternative to finding files is find (Youngman 2008), which:

  • Allows for elaborate searching on properties such as size, access time, and permissions.
  • Handles dashes.
  • Handles special characters such as spaces and newlines.

Here’s the same but then using parallel:

The -print0 option allows file names that contain newlines or other types of white space to be correctly interpreted by programs that process the find output. If you are absolutely certain that the filenames contain no special characters such as spaces and newlines, then you can omit -print0 and -0 options.

If the list to process becomes too complex, you can always store the result into a temporary file and then use the method to loop over lines from a file.

8.3 Parallel Processing

Assume that we have a very long running command, such as the one shown in Example 8.1.

Example 8.1 (~/book/ch08/slow.sh)
  • $RANDOM is an internal Bash function that returns a pseudorandom integer between 0 and 32767. Taking the remainder of the division of that number by 5 and adding 1 ensures that the number is between 1 and 5.

This process does not take up all the resources we have available. And it so happens that we need to run this command a lot of times. For example, we need to download a whole sequence of files.

A naive way to parallelize is to run the commands in the background:

  • Parentheses create a subshell. The ampersand ensures that it will be executed in the background.

The problem with subshells is that they are executed all at once. There is no mechanism to control the maximum number of processes. You are not advised to use this.

Not everything can be parallelized: API calls may be limited to a certain number, or some commands can only have one instance.
Quoting is important. If we did not quote $i, then only the first word of each movie would have been passed to the script slow.sh.

There are two problems with this naive approach. First, there’s no way to control how many processes you are running concurrently. Second, logging: which output belongs to which input.

8.3.1 Introducing GNU Parallel

GNU Parallel is a command-line tool written by Ole Tange. This tool allows us to parallelize commands and pipelines. The beauty of this tool is that existing tools can be used as they are; they do not need to be modified.

You may have noticed that we keep writing GNU Parallel. That’s because there are two tools with the name “parallel”. If you make use of the Data Science Toolbox then you already have the correct one installed. Otherwise, please double check that you have installed the correct tool installed by running parallel --version.

Before we go into the details of GNU Parallel, here’s a little teaser to show you how easy it is to parallelize the for loop stated above:

This is parallel in its simplest form: without any arguments. As you can see it basically acts as a for loop. (We’ll explain later what is going on exactly.) With no less than 110 command-line arguments (!), GNU Parallel offers a lot of additional functionality. Don’t worry, by the end of this chapter, you’ll have a solid understanding of the most important ones.

Install GNU Parallel by running the following commands:

You can verify that you have correctly installed GNU Parallel:

You can safely delete the created files and directories.

If you use parallel as often as us then you may want to create an alias (for example p) by adding alias p=parallel to your .bashrc. (In this chapter we’ll just use parallel for clarity.)

8.3.2 Specifying Input

The most important argument to GNU Parallel, is the command that you would like to run for every input. The question is: where should the input item be inserted in the command line? If you do not specify anything, then the input item will be appended to the command. While this is usually what you want, we advise you to be explicit about where the input item should be inserted in the command using one or more placeholders.

There are many ways to provide input to GNU Parallel. We prefer piping the input (as we do throughout this chapter) because that is generally applicable and allows us to construct a pipeline from left to right. Please consult the man page of parallel to read about other ways to provide input.

In most cases, you probably want to use the entire input item as it is. For this, you only need one placeholder. You specify the placeholder, in other words, where to put the input item, with two curly braces:

When the input item is a file, there are a couple of special placeholders you can use to modify the file name. For example, with {./}, only the base name of the file name will be used.

If the input line has multiple parts separated by a delimiter you can add numbers to the placeholders. For example:

Here, you can apply the same placeholder modifiers. It is also possible to reuse the same input item. If the input to parallel is a CSV file with a header, then you can use the column names as placeholders:

Sometimes you just want to run the same command without any changing inputs. This is also possible in parallel. We just have to specify the -N0 parameter and give as input as many lines as you want to execute:

If you ever wonder whether your GNU Parallel command is set up correctly, you can add the --dryrun option. Instead of actually executing the command, GNU Parallel will print out all the commands exactly as if they would have been executed.

8.3.3 Controlling the Number of Concurrent Jobs

By default, parallel runs one job per CPU core. You can control the number of jobs that will be run in parallel with the -j command-line argument, which is short for jobs. Simply specifying a number means that many jobs will be run in parallel. If you put a plus sign in front of the number then parallel will run N jobs plus the number of CPU cores. If you put a minus sign in front of the number then parallel will run N-M jobs. Where N is the number of CPU cores. You can also specify a percentage to the -j parameter. So, the default is 100% of the number of CPU cores. The optimal number of jobs to run in parallel depends on the actual commands you are running.

If you specify -j1, then the commands will be run in serial. Even though this doesn’t do the name of the tool of justice, it still has its uses. For example, when you need to access an API which only allows one connection at a time. If you specify -j0, then parallel will run as many jobs in parallel as possible. This can be compared to our loop with subshells. This is not advised.

8.3.4 Logging and Output

To save the output of each command, you might be tempted to the following:

This will save the output into individual files. Or, if you want to save everything into one big file you could do the following:

However, GNU Parallel offers the --results option, which stores the output of each job into a separate file, where the filename is based on the input values:

When you’re running multiple jobs in parallel, the order in which the jobs are run may not correspond to the order of the input. The output of jobs is therefore also mixed up. To keep the same order, simply specify the --keep-order option or -k option.

Sometimes it’s useful to record which input generated which output. GNU Parallel allows you to tag the output with the --tag option:

8.3.5 Creating Parallel Tools

The bc tool, which we used in the beginning of the chapter, is not parallel by itself. However, we can parallelize it using parallel. The Data Science toolbox contains a tool called pbc (Janssens 2014d). Its code is shown in Example 8.2.

Example 8.2 (Parallel bc)

This tool allows us to simplify the code used in the beginning of the chapter too:

8.4 Distributed Processing

Sometimes you need more power than your local machine, even with all its cores, can offer. Luckily, GNU Parallel can also leverage the power of remote machines, which really allows us to speed up our pipeline.

What’s great is that GNU Parallel does not have to be installed on the remote machine. All that’s required is that you can connect to the remote machine via SSH, which is also what GNU Parallel uses to distribute our pipeline. (Having GNU Parallel installed is helpful because it can then determine how many cores to employ on each remote machine; more on this later.)

First, we’re going to obtain a list of running AWS EC2 instances. Don’t worry if you don’t have any remote machines, you can replace any occurrence of --slf hostnames, which tells GNU Parallel which remote machines to use, with --sshlogin :. This way, you can still follow along with the examples in this section.

Once we know which remote machines to take over, we’re going to consider three flavors of distributed processing:

  • Simply running ordinary commands on remote machines.
  • Distributing local data directly among remote machines.

    • Sending files to remote machines, process them, and retrieve the results.

8.4.1 Get List of Running AWS EC2 Instances

In this section we’re creating a file named hostnames that will contain one hostname of a remote machine per line. We’re using Amazon Web Services as an example. If you’re using a different cloud computing service, or have your own servers, please make sure that you create a hostnames file yourself.

We can obtain a list of running AWS EC2 instances from the commanding using aws, the command-line interface to the AWS API (Services 2014). If you’re not using the Data Science Toolbox, install awscli using pip (PyPA 2014) as follows:

With aws, you can virtually do everything you can do with the online AWS Management Console. We use this command to obtain a list of running EC2 instances from AWS, but it can do a lot more.

We assume that you know how to launch instances, either through the online Management Console or through the aws command-line tool.

The command aws ec2 describe-instances returns a lot of information about all your EC2 instances in JSON format (see http://docs.aws.amazon.com/cli/latest/reference/ec2/describe-instances.html). We extract the relevant fields using jq:

The possible states of an EC2 instance are: pending, running, shutting-down, terminated, stopping, and stopped. Since we can only distribute our pipeline to running instances, we filter out the non-running instances:

(If we would leave out -r, which stands for raw, the hostnames would have been surrounded by double quotes.) We save the output to hostnames, so that we can pass this to parallel later.

As mentioned, parallel employs ssh to connect to the EC2 instances. Add the following to ~/.ssh/config, so that ssh knows how to connect to the EC2 instances:

Host *.amazonaws.com
    IdentityFile ~/.ssh/MyKeyFile.pem
    User ubuntu

Depending on your which distribution your running, your username may be different than ubuntu.

8.4.2 Running Commands on Remote Machines

The first flavor of distributed processing is to simply run ordinary commands on remote machines. Let’s first double check that parallel is working by running the command-line tools hostname List of hosts:

Here, --slf is short for --sshloginfile and --nonall instructs parallel to execute the same command on every remote machine in the hostnames file without using any parameters. Remember, if you don’t have any remote machines to utilize, you can replace --slf hostnames with --sshlogin : so that the command is run on your local machine:

Running the same command on every remote machine once only requires one core per machine. If we wanted to distribute the list of arguments passed in to parallel then it could potentially use more than one core. If the number of cores are not specified explicitly, parallel will try to determine this:

$ seq 2 | parallel --slf hostnames echo 2>&1 | fold
bash: parallel: command not found
parallel: Warning: Could not figure out number of cpus on ec2-54-88-122-140.comp
ute-1.amazonaws.com (). Using 1.
1
2

In this case, we have parallel installed on one of the two remote machines. We’re getting a warning message indicating that parallel is not found on one of them. As a result, parallel cannot determine the number of cores and will default to using one core. When you receive this warning message, you can do one of the following four things:

  • Don’t worry, and be happy with using one core per machine.
  • Specify the number of jobs per machine via -j.
  • Specify the number of cores to use per machine by putting, for example, 2/ if you want two cores, in front of each hostname in the hostnames file.
  • Install GNU Parallel using a package manager. For example, on Ubuntu:

8.4.3 Distributing Local Data among Remote Machines

The second flavor of distributed processing is to distribute local data directly among remote machines. Imagine you have one very large data set that you want to process it using multiple remote machines. For simplicity, we’re going to sum all integers from 1 to 1000. First, let’s double check that our input is actually being distributed by printing the hostname of the remote machine and the length of the input it received using wc:

We can verify that our 1000 numbers get distributed evenly in subsets of 100 (as specified by -N100). Now, we’re ready to sum all those numbers:

Here, we immediately also sum the ten sums we get back from the remote machines. Let’s double check the answer is correct:

Good, that works. If you have a larger command that you want to execute on the remote machines, you can also put it in a separate script and upload it script with parallel.

Let’s create a very simple command-line tool called sum:

Don’t forget to make it executable as discussed in Chapter 4. The following command first uploads the file sum:

$ seq 1000 | parallel -N100 --basefile sum --pipe --slf hosts './sum' | ./sum
500500

Of course, summing 1000 numbers is only a toy example. It would have been much faster to do this locally. However, we hope it’s clear from this that GNU Parallel can be incredibly powerful.

8.4.4 Processing Files on Remote Machines

The third flavor of distributed processing is to send files to remote machines, process them, and retrieve the results. Imagine that we want to count for each borough of New York City, how often they receive service calls on 311. We don’t have that data on our local machine yet, so let’s first obtain it from https://data.cityofnewyork.us/ using their great API:

Note that jq -c '.[]' is used to flatten the array of JSON objects so that there’s one line. We now have 10 files containing compressed JSON data. Let’s see what one line of JSON looks like:

If we were to get the total number of service calls per borough on our local machine, we would run the following command:

Because this is quite a long pipeline, and because we’re using it again in a moment with parallel, it’s worth to go over it:

  • Expand all compressed files using zcat.
  • For each call, extract the name of the borough using jq.
  • Convert borough names to lowercase and replace spaces with underscores (because awk splits on whitespace by default).
  • Count the occurrences of each borough using sort and uniq.

  • Reverse the count and borough and make it comma delimited using awk.
  • Add a header using header.
  • Sort by count and print table using csvsort (Groskopf 2014j).

Imagine, for a moment, that our own machine is so slow that we simply cannot perform this pipeline locally. We can use GNU Parallel to distribute the local files among the remote machines, let them do the processing, and retrieve the results:

This long command breaks down as follows:

  • Print the list of files and pipe it into parallel.
  • Transmit the jq binary to each remote machine. Lucklily, jq has no dependencies. This file will be removed from the remote machine at the end because we specified --trc (which implies the --cleanup command-line argument).
  • The command-line argument --trc {.}.csv is short for --transfer --return {.}.csv --cleanup. (The replacement string {.} gets replaced with the input filename without the last extension.) Here, this means that the JSON file gets transfered to the remote machine, the CSV file gets returned to the local machine, and both files will be removed after each job from the remote machine.
  • Specify a list of hostnames. Remember, if you want to try this out locally, you can specify --sshlogin : instead of --self hostnames.

  • Note the escaping in the awk expression. Quoting can sometimes be tricky. Here, the dollar signs and the double quotes are escaped. In quoting ever gets too confusing, remember that you put the pipeline into a separate command-line tool just as we did with sum.

If we, at some point during this command, run ls on one of the remote machines, we could see that parallel indeed transfers (and cleans up) the binary jq, the JSON files, and CSV files:

Each CSV file looks like this:

We can sum the counts in each CSV file using Rio and the aggregate function in R:

Or, if you prefer to use SQL to aggregate results, you can use csvsql as discussed in Chapter 5:

8.5 Discussion

As data scientists, we work with data, and sometimes a lot of data. This means that sometimes we need to run a command multiple times or distribute data-intensive commands over multiple cores. In this chapter we have shown you how easy it is to parallelize commands. GNU Parallel is a very powerful and flexible tool to speed up ordinary command-line tools and distribute them over multiple cores and remote machines. It offers a lot of functionality and in this chapter we’ve only been able to scratch the surface. Some features of GNU Parallel are that we haven’t covered:

  • Different ways of specifying input.
  • Keep a log of all the jobs.
  • Only start new jobs when the machine is under a certain load.
  • Timeout, resume, and retry jobs.

Once you have a basic understanding of GNU Parallel and its most important options, we recommend that you take a look at its tutorial listed in the Further Reading section.

8.6 Further Reading