About 12 months ago, we made a decision to move our entity resolution pipeline into the Scala/Spark universe. This was not without its pain points. This was our first major push as a company to productize entity resolution prototypes that had been in development for pretty much as long as the company has existed. It was also the first time our team had worked with either Scala or Spark.
Looking back over the year, there are dozens of "learning moments" that I would love to ship via wormhole to my former self.
In case the opportunity arises, here’s the transmission:
Shuffle is the transportation of data between workers across a Spark cluster's network. It's central for operations where a reorganization of data is required, referred to as wide dependencies (See Wide vs Narrow Dependencies). This kind of operation can quickly become the bottleneck of your Spark application. To use Spark well, you need to know what you shuffle, and for this it's essential that you know your data.
Skew is an imbalance in the distribution of your data. If you fail to account for how your data is distributed, you may find that Spark naively places an overwhelming majority of rows on one executor, and a fraction on all the rest. This is skew, and it will kill your application, whether by causing out of memory errors, network timeouts, or exponentially long running processes that will never terminate.
A powerful way to control Spark shuffles is to partition your data intelligently. Partitioning on the right column (or set of columns) helps to balance the amount of data that has to be mapped across the cluster network in order to perform actions. Partitioning on a unique ID is generally a good strategy, but don’t partition on sparsely filled columns or columns that over-represent particular values.
It's absolutely essential to model the number of partitions around the kinds of things you’re solving. In the stage of our application where we run parallel transformations on many heterogeneously-sized datasets at once, 200 partitions works just about fine.
When we are dealing with billions of pairwise comparisons, we have found that partitions in the range of 4-10k work most efficiently.
Furthermore, if you run tests on a single server (or locally), you may see dramatic speed improvements by re-partitioning data down to size 1. We recently squashed a particularly curious bug where our end-to-end test ran fine on our local 8 or 16 core machines, but would fail to ever complete on the 2-core server on which we run our CI. Combining the data down to 1 partition solved our issue.
While you can depend on Spark to do a lot of parallel heavy lifting, you can push your jobs even harder with thoughtful use of Scala's built in
.par functionality, which can operate on iterables. The initial steps of our ER pipeline involve reading in dozens of heterogeneous datasets and applying shared transformation pipelines to each of them. A simple
datasets.par.foreach cut our run times in half.
Of course, you can only rely on its usage for aspects of your pipeline that are completely deterministic and provide no risk of a race condition. Overzealous usage of
.par can quickly result in mysteriously disappearing or overwritten data.
Joins are by far the biggest shuffle offender, and the dangers of sql joining are amplified by the scale Spark enables. Even joining medium sized data can cause an explosion if there are repeated join values on both sides of your join. This is something that we at Enigma have to be particularly wary of, where 'unique' public data keys may result in a couple million row join exponentially exploding into a billion row join!
If there is a chance your join columns have null values, you are in danger of massive skew. A great solution to this problem is to "salt" your nulls. This essentially means pre-filling arbitrary values (like uuids) into empty cells prior to running a join.
Operations in Spark are divided between transformations and actions. Transformations are lazy operations that allow Spark to optimize your query under the hood. They will set up a DataFrame for changes—like adding a column, or joining it to another—but will not execute on these plans. This can result in surprising results. For instance, it's important to remember that the behavior of a UDF is to not have a materialized value until an action is performed. Imagine, for instance, creating an id column using Spark's built-in
monotonically_increasing_id, and then trying to join on that column. If you do not place an action between the generation of those ids (such as checkpointing), your values have not been materialized. The result will be non-deterministic!
Checkpointing is basically the process of saving data to disk and reloading it back in, which would be redundant anywhere else besides Spark. This both triggers an action on any waiting transformations, and it also truncates the Spark query plan for that object. Not only will this action show up in your spark UI (thus indicating where exactly you are in your job), it will help to avoid re-triggering latent udf actions in your DAG, and conserve resources, since it can potentially allow you to release memory that would otherwise be cached for downstream access. In our experience, checkpointed data is also a valuable source for data-debugging forensics and repurposing. The training data for our pipeline, for instance, is filtered out from a 500 million row table generated halfway through our application.
The Spark UI is your friend, and so are monitoring tools like Ganglia that let you know how your run is going in real-time. Yarn's depiction of the Spark query plan can instantly communicate whether your intentions align with your execution. Is something that is supposed to be one join actually a cascade of many small joins?
The SparkUI also contains information on the job level, the stage level, and the executor level. This means you can get quickly see if the number/volume of data going to each partition or to each executor makes sense, and you can see if any part of your job is supposed to be 10% of the data but is taking 90% of the time. Monitoring tools that allow you to view your total memory and CPU usage across executors is essential for resource planning and autopsies on failed jobs.
When we first started using Spark, we used standalone clusters on Yarn and Amazons's EMRFS. We learned the hard way that gathering Spark logs is a non-trivial task. We are happy to now use Databricks, which handles the essential matter of log aggregation for us, but if you are spinning up your own solution, a log aggregation tool like Kibana is probably essential for introspection sanity.
It took quite a while to get used to the fact that Spark complains about one thing, when the problem is really somewhere else.
- "Connection reset by peer" often implies you have skewed data and one particular worker has run out of memory.
- “java.net.SocketTimeoutException: Write timed out” might mean you have set your number of partitions too high, and the filesystem is too slow at handling the number of simultaneous writes Spark is attempting to execute.
- "Total size of serialized results... is bigger than spark.driver.maxResultSize" could mean you’ve set your number of partitions too high and results can’t fit onto a particular worker.
- “Column x is not a member of table y”: You ran half your pipeline just to discover this sql join error. Front-load your run-time execution with validation to avoid having to reverse engineer these errors.
- Sometimes you will get a real out of memory error, but the forensic work will be to understand why: Yes, you can increase the size of your individual workers to make this problem disappear, but before you do that, you should always ask yourself, "is the data well distributed?"
Coming from Python, it was a surprise to learn that naively reading CSVs in Scala/Spark often results in silent escape-character errors. The scenario: You have a CSV and naively read it into spark:
val df = spark.read.option("header", "true").csv("quote-happy.csv")
Your DataFrame seems happy—no runtime exceptions, and you can execute operations on the DataFrame. But after careful debugging of your columns, you realize that at some point in the data, literally everything has shifted over one or several columns. It turns out that to be safe, you need to include
.option("escape", "\"") in your reads.
Better suggestion: Use Parquet!
The open-source file format is designed to offer read/and write operations an order of magnitude more efficient than uncompressed CSVs.
Parquet is "columnar" in that it is designed to only select data from those columns specified in, say, a Spark sql query, and skip over those that are not requested. Furthermore, it implements "predicate pushdown" operations on sql-like filtering operations that efficiently run queries on only relevant subsets of the values in a given column. Switching from uncompressed tabular file formats to parquet is one of the most fundamental things you can do to improve Spark performance.
If you are responsible for generating parquet from another format—say you are using PyArrow and Pandas for some large-scale migration—be conscious that simply creating a single parquet file gives up a major benefit of the format.
And there you have it, a loose assemblage of suggestions, cobbled together from a year of using Spark. Here’s hoping my future self has already found that wormhole and is sending me the year two edition as you’re reading this.