Asynchronous Sorting in Go


When we began working on Dolt we made the decision to build on top of Noms. Noms stores data in a content addressable DAG, and has countless applications. It was a great starting point for us to build Dolt, and it let us hit the ground running. Once we had a working prototype of the product we began importing data.
Every dataset we imported taught us something new about the product. It helped us find issues with our code and dependencies, and it helped us understand the scale of data that we could handle. At that time, doing an import with a few million rows could be painfully slow. As I dove into the profile the majority of the time was spent sorting, and this is how I fixed that.

Background

Dolt stores tables in Noms, in a map, the details of which were covered by Aaron Son in this blog article. The TLDR is that a B-Tree like structure, called a Prolly-tree, backs the map that stores our table data. To efficiently edit our map, we need to apply edits in order.

The guys at Attic Labs, who wrote Noms, wrote a really cool way to get data into a map with their streaming map implementation. It takes a channel as an argument which you stream keys and values to, and returns a channel which you can read the resulting map from. It's pretty fast, and there is something about the interface that feels like Rob Pike would love it. The gotchas are:

  1. You need to stream your data into the map in order.
  2. It only works for new maps. You cannot use it to edit a map that already has data in it.

In either of these cases you'll need to use a MapEditor which collects a list of changes that will be applied to the map, and only after all the edits are collected, does it sort all the changes, and then apply them. Dolt may have been the first Noms use case which needed to be able to support reading in billions and applying billions of map edits, so this was likely the first time this case was optimized for.

The Approach

Before they can be applied, the edits are read from some source into a list. During this time Dolt is IO bound, and there are idle CPU cycles that could be used to decrease our sort time once all the data is available.

In order to take advantage of the up front time we need to use an algorithm where work done prior to having all of the items results in improved sort times once you do have all the items. In order to accomplish this, we accumulate edits into batches and once a batch is full it is sent to a background worker routine which sorts it. Once all the edits have been added, we wait for our workers to finish sorting in the background, and then merge sort the batches of sorted edits. This has the additional benefit of being highly parallelizable. Each batch sort can run in a go routine, and as we merge the batches, pairs of batches can be merged in separate routines as well.

Sort Visualization

Implementation

The first step in our implementation is to replace the slice accumulating the edits with an interface which can be swapped in and out to make it easy to compare the existing performance and results with the new implementation.

 type EditAccumulator interface { AddEdit(k Value, v Value) FinishedEditing() (EditProvider, error) Close()
} type EditProvider interface { Next() (*KVP, error) NumEdits() int64
}

Background Sorting of Batches

Now that we know what we need to implement lets take a look at our new AddEdit implementation. It's pretty straight forward: we add our edits to a slice, and once the slice is full, we write that slice to a channel and create a new slice to accumulate map edits into.


func (ase *AsyncSortedEdits) AddEdit(k Value, v Value) { ase.accumulating = append(ase.accumulating, types.KVP{Key: k, Val: v}) if len(ase.accumulating) == ase.sliceSize { ase.asyncSortAcc() }
} func (ase *AsyncSortedEdits) asyncSortAcc() { ase.sortChan <- ase.accumulating ase.accumulating = make([]types.KVP, 0, ase.sliceSize)
}

Our sortChan is being read by a go routine running the function sorter which sorts it and writes the results to resultsChan

func sorter(in, out chan types.KVPSlice) error { for kvps := range in { err := sortEdits(types.KVPSort{Values: kvps}) if err != nil { return err } out <- kvps } return nil
}

We will accumulate edits until the point when FinishedEditing is called at which time we need to return an EditProvider which is used to iterate over all the edits in sorted order. We close our background batch sorting channel, sortChan, sort any edits that are in the accumulating slice, and then wait for all background sorting to finish before calling mergeSortBatches.

 func (ase *AsyncSortedEdits) FinishedEditing() (types.EditProvider, error) { close(ase.sortChan) if len(ase.accumulating) > 0 { err := sortEdits(types.KVPSort{Values: ase.accumulating}) if err != nil { return nil, err } coll := NewKVPCollection(ase.nbf, sl) ase.sortedColls = append(ase.sortedColls, coll) } ase.wait() ase.mergeSortBatches() return ase.Iterator(), nil
}

Merging Sorted Batches

We have a list of sorted batches, that we need to merge. We need a way of grouping our batches into pairs. As you merge N batches, pairwise, down to a single collection of sorted edits, it is possible that your batches vary in size wildly. Say you have 9 batches of edits. Here are two ways they could be merged:

10 10 10 10 10 10 10 10 3 3 10 10 10 10 10 10 10 10 \ / \ / \ / \ / | \ / \ / \ / \ / | 20 20 20 20 | 13 20 20 20 10 20 20 20 20 | 10 13 20 20 20 \ / \ / | \ / \ / | \ / \ / | \ / \ / | 40 40 | 23 40 20 40 40 | 20 23 40 \ / | \ / | ------+------ | ------+----- | 80 3 43 40 \ / \ / ------+------ ------+------ 83 83

The first approach uses a naive merge order. The second sorts by size between levels which results in more even batches being grouped as you merge down each batch. I don't believe there to be a measurable performance impact of either strategy, but we opted to go with the more balanced approach.


func pairCollections(colls []*KVPCollection) [][2]*KVPCollection { numColls := len(colls) pairs := make([][2]*KVPCollection, 0, numColls/2+1) sort.Slice(colls, func(i, j int) bool { return colls[i].Size() < colls[j].Size() }) if numColls%2 == 1 { pairs = append(pairs, [2]*KVPCollection{colls[numColls-1], nil}) colls = colls[:numColls-1] numColls -= 1 } for i, j := 0, numColls-1; i < numColls/2; i, j = i+1, j-1 { pairs = append(pairs, [2]*KVPCollection{colls[i], colls[j]}) } return pairs
}

Now that we have a strategy for pairing our batches we can get back to merging them into a single sorted collection. We spawn worker go routines to handle the merge operations up to some configurable limit. Then we write our batch pairs to a channel to be sorted, and close the channel once all the batches being merged have been written. Then we will wait for the results before doing it all over again for the next level of our merge tree.

 func (ase *AsyncSortedEdits) mergeSortBatches() { for len(ase.sortedColls) > 1 { pairs := pairCollections(ase.sortedColls) numPairs := len(pairs) numGoRs := ase.sortConcurrency if numGoRs > numPairs { numGoRs = numPairs } wg := &sync.WaitGroup{} wg.Add(numGoRs) sortChan := make(chan [2]*KVPCollection, numPairs) resChan := make(chan *KVPCollection, numPairs) for i := 0; i < numGoRs; i++ { go func() { defer wg.Done() merger(sortChan, resChan) }() } for _, pair := range pairs { sortChan <- pair } close(sortChan) wg.Wait() close(resChan) ase.sortedColls = nil for coll := range resChan { ase.sortedColls = append(ase.sortedColls, col) } }
}

The code presented here omits some error handling for clarity, and does not go into the details of merge sort implementation, but the full source code can be found here.

Visualizing the Difference

Sort Visualization

This video provides a visual representation of the old and new sorting strategies. At the start of the video you can see data being added, and after half a second we begin async sorting our first batch. The process of adding and concurrently sorting batches asynchronously continues for about 5 seconds until all the data is available. It is at this time that the old model begins sorting it's data, and then at 6 seconds all of our batches are sorted and we begin merging. The new model completes in about 8 seconds, and the old is done after 15.

Performance Impact

While the visualization helps us see what's going on, they may not represent real world benchmarks. On small sets of changes (small being less than 1 batch), our new methodology is identical to the old. For change sets where the number of edits is just slightly more than one the additional parallelism has a small negative performance impact. After that the performance impacts are very positive. Benchmarking 10 million edits gave the following results on average:

  • Async Batch Sorting with Merge

    • Time to add the items: 7.52
    • Time to sort once all items are added: 3.16
  • sort.Stable once all items are added:

    • Time to add the items: 8.97
    • Time to sort once all items are added: 32.96

Roughly a 10x performance improvement in the sort time, meant that Dolt users are able to get data into a table more quickly. Another interesting improvement was in the amount of time needed to add the items to the collection of edits. The new method pre-allocates fixed size blocks that it writes to a channel for sorting before creating another fixed size block. The old method would simply append to a slice. The more you append the more time you will need to grow your slice. This involves additional allocations, copying, and garbage collection.

Summary

As Dolt becomes an extremely useful product, it's performance will become a focus of development. We are committed to supporting larger datasets, and more use cases. Stay tuned for future blog articles detailing some of this work.