Rangwala: Today, me and my co-speaker, Felix GV, are going to talk about one of the core foundational machine learning service that we have at LinkedIn, also called People You May Know. Over the course of this talk, we are going to go over various architectures that we built in order to support the service, and the lessons that we have learned from building these services. Before I start, I just want to say we are sort of giving a summary more than 10 years of work in this area, and this is a collective work of a lot of people at LinkedIn, we are the one who are presenting it.
I would like to begin this with this example, about a while ago, a part of my professional network looked something like the figure that you see on your left, where there were a bunch of people that I knew in real life, and these people in turn also knew some other people, or even the same people that I knew. When I would look at the same professional network that I had on LinkedIn, I was connected to some of the people that I knew in the real life, but I was not connected to a few of the people that I knew in the real world. For LinkedIn, allowing users to have this kind of connection or predicting this connection and telling the people that these are the people that they can connect to, was of vital importance.
That's what about more than a decade ago, LinkedIn as a company realized that this would be a very important machine learning product to have. I presume everyone has LinkedIn app on their phone, if you go on your phone, and if you click on the icon at the very bottom, the second icon, it shows you a list of people that you can connect to. That the product that we call People You May Know. This product is very important to LinkedIn because as a company, it's our mission to allow our members to build their professional network, and by predicting who are the people that they can connect to and assisting them in connecting to those people, we are helping them grow their professional network.
It has the implication of building- or a richer professional network goes just beyond being connected to certain people. There are many products at LinkedIn that are impacted by the professional network that you have. If you go to linkedin.com and if you see a bunch of articles that are being showed on linkedin.com, many of those articles are coming from people that you are connected to, and those people have either liked those articles or commented on their articles. The richer your professional network is, the richer is the liquidity of the articles that we have out of which we can generate a curated list of articles to recommend to you. Not only does building a network help building the social graph that LinkedIn has, but it also improves many other products at LinkedIn.
Having described to you what the product People You May Know is, we will then look a little bit deeper into, what do we do? What it takes for us to generate those recommendations? Then we'll look at the various architectures that we built over the course of more than a decade, and then where PYMK is right now, from a system architecture perspective. Then finally, we will conclude with the lessons that we have learned and where we see the product going with the scale that it is at right now.
Not unlike many machine learning products, what we do is we mine the data in order to generate the recommendations as to who the people can connect to. In our particular use case, we are looking at two categories of data, one is, we are looking at the LinkedIn economic graph. Now the question comes, what is a LinkedIn economic graph? If you look at all of our members and then how they are connected to each other, this is what we call the connection graph. We take this graph and augment this with the companies that people have worked together at the same time, or the schools that they have gone to at the same time. This hybrid graph which consist of companies, school, and people, and they are all connected with each other, forming a relationship is what we call the economic graph. In order to generate the recommendations for PYMK, we are mining this graph to get a list of people that you could actually connect to. In addition, we are also looking at the activities of the members and their profile, the industry that they work in, the area that they go to work, in order to figure out what are the recommendations we should give to our users.
Canonical recommendation systems can be thought of consisting of these three broad modules. One is, candidate generation which tries to come off with the initial set of people that can be recommended to our users and then a recommendation system will do some sort of feature generation and scoring in order to rank how these recommendations are to be shown to the users. People who have the machine learning background could understand that the first part is sort of taking care of the recall, while feature generation and scoring is trying to address the precision of the recommendations. Now, let's look at how these two modules are addressed in PYMK.
For PYMK, in order to generate candidate generation, most of our candidate generation comes by looking at the economic graph. What we are trying to do here is we are trying to look at commonalities between the person to whom we want to recommend, and the person that we want to recommend, and do those people have some sort of commonalities? These commonalities are looked for in the economic graph. One of the very well-known techniques which many other companies also use is this notion that there is a very high probability that a friend of your friend is also likely to be your friend. If you have a connection, then their connections are more likely to be the people that you already know. In some literature, this is also referred to as triangle closing.
In addition to that, we also look at co-workers, which essentially is walking the same economic graph, but in this case, going through companies rather than your connections to find the second hop node in this graph. In some specific cases, we also look at slightly more specialized algorithms which are personalized page rank, which involves doing random walks on this graph.
For feature generation we are using various sources of data, one of them is that now that we have figured out possible candidates that can be suggested to you, we are trying to have a measure of commonalities so to speak. If we have a person that we would probably recommend to you, we are also going to look at how many common friends are there. If I have more common friends with someone and less common friends with somebody else, then I'm more likely to know the first person as compared to the second person. We are also counting for every person that we recommend, how many common friends are there between, let's say in this case, me and Felix, which in this example happened to be one. In addition, we are also looking at the activities of the members, whether they work in the same area, whether they belong to the same industry to generate some of the features.
If you look at PYMK’s recommendation system, this recommendation system still has these three components that I talked about: candidate generation, feature generation, and scoring. For this talk we will concentrate on the candidate generation and the feature generation part, because in our experience for this particular product, we have realized that these are the two challenging components of our system, compared to the scoring. We have until now found scoring to be much simpler to handle.
If you look at the candidate generation and feature generation in this, for PYMK, we realize that what we essentially need in order to do all these recommendations, the kind of competition that we need, are graph processing and lots of data processing. Having described what is it that we need in order to enable the system, we'll now look at the multiple architectures that we built over a very long course of time of how we went about trying to address this problem.
In retrospect, it looks like if you take a step back and see our architecture, the approach that we were following was this concept of precomputing all the computations. What we were doing, which usually is most of the time the first step whenever a machine learning product has been built, is that we will generate recommendation for all of our members ahead of time, and we will store it somewhere. Then the users would come to the website, I don't know if at that time, the app was still there, but if they will visit the app, we would just go and look up whatever we have precomputed ahead of time and just show those recommendations to those users.
This choice of precomputing the recommendation led to two implications. One is that we had to generate recommendation for all our users, and so, our system became heavily focused over a high throughput. For us, a system which actually can do all this computation in a much shorter period of time, or the total throughput that we got from the system became critical for all of our design choices.
The very first architecture started with the economic graph, or the connection graph was in Oracle. We would run SQL scripts on top of it to generate the recommendations, store it again in the SQL database, and then whenever a user will come online, we would just go and look up from the database. This system actually performed fairly well for a while, while we have members in tens of millions. As our members started growing, the number of people on LinkedIn started growing, we realized that it was taking longer and longer to generate the recommendations for all of our members, at that time, we moved to a Hadoop system.
An anecdotal story about LinkedIn, is that the first Hadoop cluster at LinkedIn was built to serve PYMK, that was the reason why Hadoop cluster was built at all at LinkedIn, so we moved the precomputation to a Hadoop cluster. The same computation that was done using SQL script was now done using a Hadoop MapReduce job. Then, the precomputed recommendation was stored in a key-value store, at that time, we had what is called Voldemort. When the user would come online, we would actually serve the recommendation. This brought down the time it took to generate the recommendations to couple of days and this system performed really well for some hundreds of millions of users.
As our members grew on LinkedIn, we were able to stretch this system to the next level in order to take our growth, until we reached somewhere in the middle of hundreds of millions of users. At that time, we migrated some parts of our MapReduce job to Spark. Essentially what we did here- and for those of you who are interested, there is a link which really describes what exactly we did- but the key takeaway was, what we realize is that there are certain characteristics of our data that we can exploit in order to do certain specialized joins, which can bring down the time it took for us to do the computation.
It was not a generalized solution, but it was a specialized solution for the problem that we were trying to solve and we were exploiting the characteristics of the data. The time it took to generate the recommendations to a few days, this was around 300 millions, 400 millions of users. At the same time, while we were doing this, the machine learning engineers in our team were trying to look at a slightly different problem. For them, the metrics mattered, how much they are able to make people connect to each other. They were looking at a different aspect of the problem, they were trying to figure out how much the recency of the data on which the recommendation has been generated matters.
What they did is they did a small POC where we were still generating the recommendation the way we were using Spark and putting it into a key-value store. At the time when the users would visit the website, we would try to gather some of the data that has changed from the time we computed the recommendation to the time that the user has visited. Let's say some profile has changed or the users had made some new connection in the meantime, from the time that the last recommendation was generated, and exploit that information to actually improve the recommendation that we were generating.
When we did that, we figured out that it actually gave us one of the biggest lift in metrics in at least the last three or four years. This approach of generating recommendation offline then augmenting it again online when the user comes leads to what we call a split-brain design, where the logic that actually calculates what is the recommendation that has been shown to the user is now performed at two different places. When the recommendations are not what we expect them to be, it was a very eerie job to figure out what is the part that we need to tune, or what is going wrong in the whole system, because we had two models, one offline, one online.
When we looked back at all of these architectures, and once we had reached this point, we had two very important realizations. For the first time we realized that the freshness matters and it matters really a lot, we cannot ignore the fact that generating the recommendation over fresh data is important. The second thing we also realized is that the precomputation that we are doing for the recommendation is becoming extremely costly for us. That is for two reasons, one is that the amount of computation that we have to do, it actually grows super-linearly with our member growth. The reason being that, when a member makes one connection, there are a whole bunch of edges that now we have to look at, just by a single connection being made by our member. That's one, the second is that when you have only few millions of users on your website, if you are doing precomputation for all of them and let's say only half of them visit the website the next day, you're only doing a computation for half a million users, which you throw away. When you have 500 million users, you're doing calculation for 250 million users, which you're not using the next day. In absolute terms, the additional cost that we were paying for precomputing recommendations that were not getting used because the computation is regenerated every day in an offline process, was becoming very expensive.
It was at that time we realized that it is time for us to take a step back, and instead of trying to build an infrastructure that is trying to optimize for the approach that we have taken, should we even rethink the approach by which we are actually generating the recommendations? We said that probably we have reached a certain point in our growth where we should start generating recommendation on demand rather than precomputing it ahead of time.
The moment we had that realization, we also had to shift how we went about designing our system, because earlier when we were doing precomputation, our system designs were focused on throughput, but the moment you start doing recommendation on demand, you have to generate the recommendation only for the people who visit the website. We don't have too many recommendations to generate, but now we have to generate recommendation really fast because the user is not going to wait. We are in the millisecond range now instead of hours or minutes.
If you go back again for the candidate generation and feature generation, we realized that now if you have to solve this problem by generating recommendation on demand, we need an online graph traversal system that can walk on the graph in real time. In addition, we also need a data access store that can access a lot of data very quickly for us.
That's when we started building what we call Gaia. Gaia is actually a very generic online service that we have built which can actually run complex graph algorithms in real time on really massive graphs. What we are trying to do in the system is we are trying to push the boundaries of technologies to try to be able to run or process as many edges or nodes as possible in a very short period of time, but the kind of graphs that we were looking for are graphs with tens of billions of edges and Gaia is that system that allows us to do it. With Gaia, what you do is, you take a graph and you give a graph a snapshot of a graph to Gaia. From the very beginning, we designed this system to be generic in the sense that, in this graph, you can have different kinds of nodes. These nodes might not just represent members; they can also represent companies, or they can represent school, and the edges can represent any kind of relationship between those nodes.
In addition to that, Gaia can also ingest any changes that are happening to the graph via a data stream, so as long as the changes are coming in the data stream, we will be able to ingest it and keep the graph up to date. Finally, we actually designed a compute API. Using that compute API, one can specify the kind of graph algorithms that they want to run. In our case, triangle closing, which is finding the two hop neighbors who are not directly connected to you, or doing random graph walks on the graph. Gaia would then be able to run these graph algorithms whenever a call has been made to Gaia, will be able to run these graph algorithms in real time onto the graph that we have given to Gaia.
When it came to designing Gaia, we started with a single server architecture, we made a choice of designing Gaia with a single server architecture, which means that a single Gaia machine is capable of doing all the computations. The way we did it is that we actually load the whole graph into the memory and we do have really powerful machines to be able to do it. Then, the focus of our design was then shifted to building data structures which are efficient, so that we don't use a lot of memory. In addition, the system was designed for very fast memory access, so that we can work on lot of edges in a very short period of time.
The architecture looks something like this, you have a bunch of servers in the Gaia cluster, then the graph algorithm that we want to run, we actually deploy it on all the machines. The graph snapshot, when it is given to Gaia, it has been loaded on all the machines, so all the machines are almost exact replica of each other, and any changes that are coming from the graph are ingested by all the machines to keep the graph up to date, and now Gaia is ready to execute this algorithm on demand. We are able to use Gaia to generate recommendations for PYMK which generate candidates and also generate the features, the one that I talk about. Common connections is how many common friends are there between two users and we are able to do this in tens of milliseconds. Our system runs completely off-memory, using memory, and we are able to do this in tens of milliseconds.
At this point, I'll hand it over to my co-presenter who will talk about the fast data store that we have built.
GV: I'm Felix from data infrastructure at LinkedIn, and I primarily work on Venice, which is a key-value store developed at LinkedIn, which I'm going to tell you more about. Venice is tailor-made for serving the output of machine learning jobs. What does it take to do that? First of all, machine learning jobs output a lot of stuff, and therefore, it's important for a system tailored for them to be very good at ingesting a lot of data. Traditional databases usually are not very write heavy and they tend to be read heavy, in the case of Venice, it's the opposite, it is very write heavy and not necessarily read heavy.
Fast lookups are extremely important, and we'll talk in depth about that. The last point I want to make in terms of catering to the needs of machine learning engineers is that self-service onboarding is extremely important. We have this objective at LinkedIn of making machine learning engineers more productive, and one of the aspects of that is we'd like every single machine learning engineer to be able to test 30 different new features every quarter. In order to achieve that, it's important that there is as little friction as possible in deploying new models, tweaking them, etc.
Let's dig a little bit into how do we allow loading data into Venice? The most traditional use case, which is the one that Sumit has been talking about the first half of the talk, is batch data computed on Hadoop and loaded via a push job into Venice. We have hundreds of flows that are doing this everyday refreshing the entire data set every time. A new emerging category of use cases that we have is stream processing jobs, at LinkedIn, we developed Apache Samza as our stream processor, and so there is a tight interconnection between Samza and its output being loadable into Venice in near real time.
These are the two most obvious candidates for data sources into Venice, but the other cells of that matrix are also supported actually. It is possible to compute only a partial data set on Hadoop and push that as a diff into Venice. Likewise, it is also possible to change a stream processing jobs code and have it rerun on the entire historical stream of events as a reprocessing job. Sometimes this is referred to as the Kappa architecture online, and this is one of the supported ingestion modes in Venice. Finally, there is also another mode which is hybrid, where any type of batch data source can be combined with a streaming jobs output, and this allows us to cater to the applications that want to build Lambda architectures where they have both of these types of data sources coming into the system at once.
Let's zoom in on PYMK, and the work that we did and data infrastructure to enable the rearchitecture that Sumit [Rangwala] has been talking about. The first use case that we onboarded is online feature retrieval, we started work on this about nine months ago and we went to production with it roughly six months ago. The characteristics of that workload are roughly as such, the PYMK online service needs to do thousands of queries per second. Each of those queries needs to retrieve features relating to a thousand members, so, if you multiply the two together, you get to a requirement of needing to support millions of lookups per second, each of those thousand key queries needs to return really fast. Thankfully, at this stage, the value associated with each of those keys was fairly small, a collection of features totaling maybe 80 bytes or so, but it was still challenging to get that working fast enough for PYMK's needs.
When we first tried this on what Venice was like nine months ago, it was not performing well at all. It took about four seconds to execute just one of these thousand keys queries, and so that wasn't going to cut it. We did a bunch of tuning work and eventually what led us to move the needle the most in terms of performance was swapping out our storage engine for RocksDB, which is a great piece of software. That has allowed us to tremendously reduce both our average and long tail latency. In the case of the p99 latency, we went down from 4 seconds to 60 milliseconds. Oh, and by the way, the 4 seconds wasn't even at full load, whereas the 60 millisecond is at that millions of lookups per second throughput. That was great, we onboarded that use case from PYMK allowing them to move their computation more and more online and on demand.
Then, the next thing that we wanted to tackle was beefing up the size of those features quite a bit by storing embeddings inside Venice. Now, instead of having just a small collection of features, we had these massive vectors of floating point data, these multidimensional vectors, and we wanted to retrieve those very fast. The characteristics of this use case are similar in terms of throughput, but the value sizes were 10 times larger, so you can imagine that shuttling all this data across is going to be an extra challenge. When we try to deploy this use case onto what Venice had become at the time, six months ago, the performance was not so great, we were at 275 millisecond p99, and we tried a bunch of things to get it faster but it was hard.
We also had, meanwhile, another project that we were working on closely with the People You May Know team, and we decided to run for the fences and accelerate the timeline of that other project and use that to serve the embeddings use case. This other project was adding the ability inside Venice to perform server-side computations. By leveraging server-side computations, we have been able to onboard this much larger embeddings use case at the latency that's similar to the much lighter online feature retrieval use case, so going back down to about 60 millisecond p99.
Let's zoom in a little bit on this computation capability. What we support here in Venice are very simple vector operations, and the characteristic that helps us out to make the workload faster is that, the input to these functions is very large, like a large vector of floats, but the output is small, like a scalar. Therefore, the response size can be brought back in line with roughly what it was previously. That turns out to be the most significant bottlenecks, it looks like, in this particular system.
In contrast with what Sumit [Rangwala] presented for Gaia, where Gaia is a framework that allows arbitrary code to be run alongside the data inside the Gaia nodes, in this case for Venice we took a different approach, where the API is declarative. Instead of having the user prescribe how to execute the computation, the user is only describing what needs to be done. It's more like a query language than an arbitrary code execution system. This clear separation of concern between what to do and how to do it has allowed the Venice team to iterate extremely quickly on various profiling experiments, tuning, and keep that loop going to bring the latencies that I've shown.
There's one more thing that we did after that, which is that we looked at an open-source at a way to do faster Avro decoding. Avro, for those that don't know, is a serialization format that's very popular in Hadoop and which other systems use as well. Venice is an Avro based system, and so it's important and for us to be extremely quick in the Avro decoding phase. We found this open-source project from a company called RTB House, and they have a great piece of software that has allowed us to reduce further the latencies that I've shown previously. The first use case is now running at 40 millisecond p99 and the embeddings with computation use case now running at 35 millisecond p99.
The RTB House software is absolutely great and I recommend you check it out, but as part of our certification process for it, we discovered some incompatibilities with the regular behavior of Avro, and we also needed to support multiple versions of Avro, because LinkedIn is a company with more than a decade of applications running, some new, some old. We have this requirement internally to run on multiple versions of Avro. We did a fork of this project, and we have recently open-sourced it just yesterday. If you have any system that uses Avro, you can plug that in and it should be pretty much 100% semantically compatible with Avro, that's a goodie that you can potentially make use of right now.
Putting It All Together: PYMK Today
Let’s put it all together, what Sumit [Rangwala] and I talked about, and let's review what PYMK looks like today. If we go back to the three phases of a recommendation system- candidate gen, feature gen, and scoring- now, we can see that Gaia that Sumit [Rangwala] talked about, takes care of the candidate generation phase in an on-demand manner by walking the graph, and also provides some graph related features such as, for example, how many connections are in common between two people.
After that, the PYMK service turns around and asks Venice for additional features relating to those candidates that came out of Gaia, and the most expensive features to retrieve have some early scoring done on them within Venice. Finally, the PYMK service does the final scoring on all of that. If we look at a box diagram of what it looks like, on the left side, we can see the two next-generation systems we talked about, Gaia and Venice. Both of those get loaded in batch and incremental fashion, then, when it comes time to serve the recommendations online, the PYMK service first interrogates Gaia, retrieves the candidates and graph related features, then turns around interrogates Venice for the rest of the features and partial scoring on the most expensive ones to retrieve, and then the PYMK service does the final scoring.
With this architecture, the key gain is that staleness is reduced dramatically. The previous precomputation based architecture was giving us a staleness of one to two days, and with this one, the staleness is reduced to seconds, to minutes, because Gaia can take advantage of ingesting changes to the economic graph in real time. Therefore, the candidate generation and everything else that flows from that afterwards is affected in real time.
Going at a higher level, the key learnings that we would like you to walk away with are the following. We tried to paint this as a journey, and we actually think every step of the journey is important. One of the takeaways is not that precomputation is bad and you should do everything online. Actually, we think precomputation is great for many products, and the ecosystem of tools, like Spark and other systems, is very robust and easy to use, and it's great for a lot of product. It is important to be cognizant of when the cost of those solutions becomes prohibitive, maybe because the problem space is too big, or all the low hanging fruits of relevance techniques have been exhausted, and now we're looking at more expensive models to evaluate. When either or both of these circumstances arise, then it's important to be flexible enough to think, "Well, maybe the next step is to move away from precomputation," even though that's more complex and more difficult.
If you are at that stage where it's worth moving away from precomputation, it's important to have this shift of mindset that Sumit [Rangwala] talked about, which is that, the problem is not so much about throughput anymore, it's more about low latency, and that changes the system design that you need to address.
The last point here is that for a long time, we've been developing the machine learning for PYMK more or less in a vacuum, in isolation, and it was the job of the infrastructure to catch up with the needs of the machine learning. We believe that we have reached a scale where that is no longer feasible, and now, the machine learning itself needs to be infra aware. It needs to become aware of web capabilities the infra can provide in order to scale better.
Looking ahead, besides infra aware machine learning, we also want to keep investing in machine learning aware infra, so that means further scaling Gaia and Venice to retrieve more candidates, more features, bigger features, and also support more and more complex computation in an online fashion. We also have this goal of productive machine learning, which I briefly touched upon, and we want to enable that by making it even easier than it is today to onboard onto Venice and Gaia. In particular for Venice, the new computation capabilities which are used today in, let's say, early adopter fashion by PYMK, we would like to make those capabilities multitenant like the rest of Venice, so that any use case at LinkedIn can onboard.
We also have a bunch of other machine learning frameworks that we use at LinkedIn, and it's important to tightly integrate with them. It takes a village to raise this system and a lot of people contributed, so their work is invaluable.
See more presentations with transcripts