Younes Abouelnagah is a Principal ML Engineer NLP / LLM at Roblox. In this post he shares how his team at Roblox was able to scale their online NLP ML model inference on CPU machines and reduce latency. He can be found on LinkedIn here.
This blog post is a technical update on how the Roblox T&S team is scaling our online NLP ML model inference, to handle an increase in the volume of communication as well as the complexity of the models. We show in detail how to make use of Ray, a distributed computing framework for Python, to serve several models on CPU machines. We share several tips to scale up and out, reducing latency and CPU usage.
Using Machine Learning to maintain civility on Roblox involves running user-generated content through multiple models. This includes online filtering of text that goes into the platform, mostly from chat but also from many other surfaces where players and developers can input text. We make sure that each piece of text passes through a filter as soon as a user or developer writes the text and before any other user sees it, to ensure that the text is suitable for the viewer and doesn’t violate any policies.
We have been using transformer models for a few years, since BERT came out. We are keeping up with the advances in the field, where much better foundational models are now available which are also much larger. We make use of models of various sizes and architectures at different parts of our pipelines, and ultimately the user interacts with our generalizable scalable online serving. It is crucial for our final models to handle billions of inferences per day without making hard assumptions about the model architecture or the serving hardware, all while adhering to stringent latency requirements.
Two years ago we published a blog post and conference talk about serving the first generation of models, based on distilBERT. A lot of optimizations were in place to achieve a latency suitable for online inference for chat text while in transit between chat participants, and several of those optimizations were tied to to the specifics of the models (e.g. BERT architecture and tokenizer, quantization, and even using a specific input length if 64 tokens). The challenge was to relax the assumptions and serve more powerful models, while matching the latency of serving the distilBERT models with all the said optimizations. Notice that we use CPUs for inference, which is a design choice we have talked about in the previous blog post.
We not only matched the blazing performance of a quantized distilBERT model with a full float model, but we also improved the latency of all models by 10-45% and reduced the CPU usage of the whole system by 50%. We did that by using Ray instead of Python Multiprocessing, which allowed us to increase threading within the PyTorch models by using a higher OMP_NUM_THREADS and removing the call to `torch.set_num_threads(1)`. In this section we detail how we used Ray to achieve such improved latency and reduce CPU usage, which uses a non-conventional setup.
A drop in latency for both distilBERT and the new full float models with the deployment of the inference service implemented with Ray Actors instead of Python Mutliprocessing:
The following charts compare various latency percentiles, before and after the switch to Ray. The latency is compared in two ways: 1) Using the existing model and serving infrastructure as a reference, since we aim to match the production performance with the new model and serving infrastructure. 2) As a percentage improvement of each model with the new infrastructure.
When looking at the P999 percentile, keep in mind that the P999 of the distilBERT model and the distilled T5 model are not looking at exactly the same input distribution. In addition to efficiency improvements, the ability to use concurrency within an inference led to reduced timeouts, which used to happen with exceptionally long user inputs. Therefore, we increased the maximum length of inputs we allow from users by 4x. This only affects the P999 because long inputs are very rare in production traffic.
Serving ML models involves more than just the inference; the input needs to be presented to the model in the same way it is presented during training, and the output of the model must be converted from logits to predictions according to the model specific characteristics. Such preprocessing and postprocessing steps are normally written in Python for use in training and offline evaluation, so using a Python service allows the reuse of the same code. This can be a challenge due to the poor scalability characteristics of Python, and this is where Ray comes in handy. This blog post is not going to introduce Ray, but will only highlight some of the key learnings in using Ray Core to scale the serving of ML models with very low latency requirements.
Very briefly, Ray is a distributed concurrent programming framework, written in C++ with bindings in Python and Java. It allows application developers to parallelize their code without writing any framework level code, by just adding an annotation to the code. The code then runs across multiple nodes in the cluster, and within each node it can run on several CPU cores, GPUs and accelerators. Ray Core has two types of runnable primitives; Ray Tasks and Ray Actors, and we will focus on the latter. A Ray Actor is a normal Python class annotated with the `ray.remote` annotation, which changes it at runtime such that it can be scheduled to run on a Ray worker process in the Ray cluster. Ray Actors are very suitable for ML Model serving, because they can hold state such as the ML Model. The Actor appears in the code as just one class that you can call normally, but each instance of an actor is actually running on several workers within each node and across nodes in the cluster. Each invocation of the actor is routed to the closest Ray worker that has the actor state loaded, and if needed it can load the actor state on an idle Ray worker and add it to the pool of workers for such actor. The state of the actor instance is synced across workers and across nodes through Ray’s distributed shared-memory object store, without any changes to the application level code and no need to add locks or synchronization when mutating it.
The conventional Ray setup is to have a long lived cluster that is used to run many Actors and Tasks, coming from a variety of shorter lived jobs or interactive notebooks connected to the cluster. This multi-tenant long lived setup has to prioritize fault tolerance, and to manage the object store in a way that allows flexibly adding and removing the state of Actors as their owner jobs start and finish. In our case, we know that the inference service for ML Model Serving is going to run indefinitely, and that its volume and latency requirements warrants a dedicated Ray cluster that avoids having a noisy tenant suddenly hogging the cluster resources. With a dedicated Ray cluster, we can change some of the configurations to prioritize latency over fault tolerance:
1# Setting up the Ray memory usage env vars such that the node would die if there is an OOM instead of becoming a zombie.
2# Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html.
3ENV RAY_memory_monitor_refresh_ms=0
4# Also, there is no need to cap the number of processes to start / actor
5ENV RAY_worker_cap_enabled=false
6# The configs are from here (not all of them are documented). You just need to prefix the env var with `RAY_`:
7# https://github.com/ray-project/ray/blob/ray-2.6.3/src/ray/common/ray_config_def.h
8ENV RAY_preallocate_plasma_memory=1
9ENV RAY_task_failure_entry_ttl_ms=100
10ENV RAY_task_oom_retries=0
11ENV RAY_kill_idle_workers_interval_ms=0
12# Finally, keep the amount of memory used by the `gcs_server` process smaller
13# (since we don't really need to go and look at history):
14ENV RAY_task_events_max_num_task_in_gcs=100
Since we have a dedicated cluster with a handful of predetermined Actors running on it, then we can preload all the Actors on all the nodes. This allows us to use normal web load balancing and the request gets served by any node, such that we don’t need to wait for traffic to go to a particular node to run the inference. This saves the cost of cross node communication, including serialization. The cost of loading all actors in each node is that all nodes now have all models in memory, which is not a large cost because these are models meant for online inference and are relatively small. The default of Ray is more resource efficient and uses auto scaling according to requested resources for the actors, but requires more complex routing that can increase latency.¹ Since we don’t use autoscaling, we also don’t need to have a single Head Node. Instead, we start a local Head Node on each server node, in the code at server startup. This makes the inference service easy to deploy, just like any other web service. We still reap the benefits of Ray in managing the efficient execution of the code on all the processors available in the node, without having to write any complex framework level code using Python multiprocessing or any similar library.
There are already a lot of resources available for learning how to write Ray Actors, but writing the most efficient implementation can feel overwhelming at times because there are a lot of options to consider. All alternatives would work, but they can have fairly different runtime performance. Even doing inference in a Task that takes the model in as a parameter works, but this is obviously very slow because the model needs to be serialized and deserialized with every invocation. Here we present our top 5 tips that led to the biggest efficiency gains in our experience.
Any class that has a coroutine (`async def this_is_a_coroutine`) becomes an Async Actor when it is annotated by `@ray.remote`. Async actors run faster and on a separate Python process that avoids the GIL, but you need to `await` the results from an async actor and you cannot use `ray.get`. However, Python only allows using `await` in specific contexts, such as inside a coroutine. So you may not be able to use Async Actors everywhere, and resort to using a Threaded Actor to make a part of the code run in parallel even though it is not called within a coroutine. You can speed up the concurrent execution of a threaded actor by using the following snippet to poll for results out of order, instead of just blocking on `ray.get`. If you need the results in order then you have to use `ray.get`, which can take a timeout but throws a RayTimeoutException
when it times out. The trick here is to use timeout=0 in the wait and then use sleep to yield control to other threads.
1def ray_wait_non_blocking(
2 ref_list: List[ObjectRef],
3 check_interval_sec: float = 0.001,
4) -> List[Any]:
5 remaining_refs = ref_list
6 results = []
7 while len(remaining_refs) > 0:
8 done_refs, remaining_refs = ray.wait(
9 remaining_refs, num_returns=len(remaining_refs),
10 timeout=0
11 )
12 if len(done_refs) > 0:
13 results.extend(ray.get(done_refs))
14 time.sleep(check_interval_sec)
15 return results
OMP_NUM_THREADS
Specifying resources and max_concurrency in the `ray.remote` annotation can have a variety of effects, and understanding such effects can get very confusing depending on whether the annotation is applied to an Async Actor or a Threaded Actor or a Task. For example, when writing an Async Actor that is meant to use 4 CPUs in each invocation and scale up as much as possible, do you set the `num_cpus` to 4 or to all the available CPUs in the node? The latter is wrong because this would make Ray create 1 copy of the actor in each node, since `num_cpus` tells the Ray scheduler how much resources each invocation would need. However, setting `num_cpus=4` is also not great because it reduces the number of copies of the actor that Ray creates, such that it doesn’t create actors that need more CPUs than what’s available. However, for online models serving with several model actors that are not always running, it is better to over promise logical resources like CPUs.
For online model serving using Async Actors, we found that it is best to leave out the resources from the `ray.remote` annotation to achieve best performance. This uses Ray’s default for Actors which is 1 logical CPU for scheduling, and 0 logical CPU for running. This means that the scheduler can allow a lot more invocations of the actor to run concurrently, and may lead to running Out Of Memory (OOM). There is a pattern in the docs to use resources to limit the number of concurrently running tasks, but we found that it is much more intuitive to explicitly set the max_concurrency which also leads to the best performance.
If you really want to use more CPUs for the model inference, then you may consider manually setting the OMP_NUM_THREADS
environment variable when creating the actor, by passing runtime_env
in the options. This works for PyTorch models, and many other libraries that uses OMP for concurrency. It is also important to know that Ray will set OMP_NUM_THREADS
to the value of `num_cpus` when creating an actor.
The serialization of values when crossing process boundaries is a headache when writing any concurrent program, using Ray or any other framework. In fact, dealing with serialization problems is a lot easier when writing Ray programs compared to other frameworks, because the ray.util.inspect_serializability
is a very handy tool – you can use it in unit tests, too. However, the serialization overhead remains a problem in terms of runtime efficiency, especially when working with a low latency model server.
The overhead can be reduced by using Value Objects that can be serialized to tuples or dictionaries, such as dataclasses.asdict or attrs.asdict. Within the Actor process the Value Objects can be passed around with no problem, keeping the code clean and allowing some static checks. Then when crossing a process boundary, specifically when returning from an Async Actor method called with `.remote` from another process, call the `asdict` function and return the dict. This assumes that the Value Object doesn’t need to be reconstructed again, or passed around much further. For example, when the HTTP server process is calling the Async Actor method, then taking the results it returns and wrapping it in a proto or json response.
With any large scale deployment, errors eventually happen. We have seen a couple of cases where the actor would just keep accumulating tasks, without processing any until we see “Warning: More than 40000 tasks are pending submission to actor“
in the logs. We also saw cases when the actor process is killed by the OS, probably because the physical node is overloaded. It is very difficult to reproduce or debug such situations and they are very rare, happening only a handful of times in a deployment of several hundreds of nodes running several actors on each for months. What we have learned is that the situation really becomes worse if this node becomes a zombie, due to trying to restart the actor and retry. Therefore, we did two things to avoid prolonging the life of a pathological node:
Set the max_restarts
and max_task_retries
options to 0 when creating the actor. This prevents Ray from trying to revive the actor when it was killed by the OS, or crashed for any other environmental reason. This assumes that the code already works and is well tested.
Created a monitor actor that keeps track of the successes and failures of other actors, and made it kill the virtual node if we see that the failure proportion is growing at an alarming rate. The virtual node will simply be replaced by the container orchestrator, normally on a healthy physical node.
With these two interventions we don’t see such problematic cases anymore, but they were too rare to begin with and you may want to only implement them if you see them happen in your case.
One final tip is to look at the performance characteristics for the majority of inputs, and try to optimize or find speed ups that would apply to them. For example, most NLP models can only do inference on sequences of up to 512 or 1024 tokens, and to handle longer inputs we use multithreading to concurrently do inference on smaller chunks shorter than the model max input length. While it is intuitive to do a scatter-gather on the input chunks using a Threaded Actor, we found that the vast majority of inputs are already shorter than the model’s max input length. The scatter-gather overhead, albeit small in the order of single digit milliseconds, adds up when serving billions of inputs per day. We therefore added another code path for the majority of inputs, that doesn’t do the multi threaded scatter gather and runs the inference directly inline. This adds complexity to the code, so it is only worth it if the volume is high enough for such small optimizations to matter.
¹ Consider the case when a web request needs to do inference with a specific model and is routed by the load balancer to a node that doesn’t have the actor for that model. The raylet will find the closest node that has this model’s actor and runs the inference on it, which happens fairly quickly but having all actors on all nodes is faster and we are prioritizing the low latency over memory consumption.