Update 5/11/23: This blog has been updated to reflect that BatchPredictor has been deprecated in favor of always recommending Ray Data streaming.
This blog covers three methods of batch inference (distributed model evaluation) in Ray: from low-level using Ray Actors, to high-level using Ray Data streaming. We'll see how low-level Ray APIs allow you to control exactly "How" Ray is executing computations, while Ray's libraries enable you to specify just the "What"—for a more out-of-the-box scalable experience.
Starting with a local Python class defining your trained model, Ray makes it easy to parallelize inference over the model using any of these ways. In the next sections, we'll walk through examples of how to perform parallel model inference on a NYC taxi data model in Ray 2.4.
Batch inference refers to generating model predictions over a set of input observations. The model could be a regression model, neural network, or simply a Python function. When the model is expensive or the data to be evaluated is large, batch inference can benefit from scaling with Ray. Ray is commonly used for parallelizing batch inference on single machines as well as clusters with thousands of GPUs.
To set up distributed batch inference in any kind of system, you need to follow two steps:
Create a number of replicas of your model. In Ray, these replicas are represented as Actors (i.e., stateful processes [1]) that can be assigned to GPUs and hold instantiated model objects.
Feed data into these model replicas in parallel, and retrieve inference results.
Let's dive into some examples: we'll start with the low-level ones to build an understanding of "How" Ray executes computations, before showing the higher level APIs.
Ray's Actor API is a natural primitive for batch inference. We'll see first how to use Actors directly to distribute batch inference. To simplify the task of dispatching work to a large number of Actors of the same kind, Ray also provides an ActorPool utility.
For all these examples, let's assume a large number of records in S3 at "s3://air-example-data/ursa-labs-taxi-data/"
that we want to run a custom model against for inference. We can start by defining a simple "pretrained" model and an Ray Actor that can be constructed from a model reference:
1import pandas as pd
2import pyarrow.parquet as pq
3import ray
4
5def load_trained_model():
6 # A fake model that predicts whether tips were given based on
7 # the number of passengers in the taxi cab.
8 def model(batch: pd.DataFrame) -> pd.DataFrame:
9 # Give a tip if 2 or more passengers.
10 predict = batch["passenger_count"] >= 2
11 return pd.DataFrame({"score": predict})
12 return model
13
14@ray.remote
15class NYCBatchInference:
16 def __init__(self, model):
17 self.model = model
18
19 def predict(self, split_path: str):
20 # read each split and convert to pandas
21 df = pq.read_table(split_path).to_pandas()
22
23 # do the inference with our model and return the result
24 result = self.model(df)
25 return result
26
To parallelize this with Ray, we put the model into the Ray object store, and then launch a number of our predictor actors as follows:
1model = load_trained_model()
2model_ref = ray.put(model)
3actors = [NYCBatchInference.remote(model_ref) for _ in range(5)]
Ray automatically fetches and de-references the model_ref
argument passed to the actor constructor, so the NYCPredictorActor
sees a materialized model object instead of a reference to it. Next we need to define our input files:
1input_splits = [
2 f"s3://anonymous@air-example-data/ursa-labs-taxi data/downsampled_2009_full_year_data.parquet"
3 f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet"
4 for i in range(12)
5]
We can dispatch and retrieve results from these actors using a ray.wait
loop. Basically, we want to keep a certain backlog of tasks "in-flight" to the actors, retrieving ready results and sending new tasks to actors as they finish with previous work:
1def process_result(r):
2 print(f"Predictions dataframe size: {len(r)} | Total score for tips: {r['score'].sum()}")
3 idle_actors = actors.copy()
4 future_to_actor = {}
5 while input_splits:
6 if idle_actors:
7 actor = idle_actors.pop()
8 future = actor.predict.remote(input_splits.pop())
9 future_to_actor[future] = actor
10 else:
11 [ready], _ = ray.wait(list(future_to_actor.keys()), num_returns=1)
12 actor = future_to_actor.pop(future)
13 idle_actors.append(actor)
14 process_result(ray.get(future))
15
16 # Process any leftover results at the end.
17 for future in future_to_actor:
18 process_result(ray.get(future))
Running this produces output like:
1Predictions dataframe size: 136999 | Total score for tips: 45142
2Predictions dataframe size: 136394 | Total score for tips: 43234
3Predictions dataframe size: 141981 | Total score for tips: 45188
4Predictions dataframe size: 148108 | Total score for tips: 47713
5Predictions dataframe size: 143087 | Total score for tips: 45510
6Predictions dataframe size: 144014 | Total score for tips: 45175
7Predictions dataframe size: 133932 | Total score for tips: 42175
8Predictions dataframe size: 145976 | Total score for tips: 48036
9Predictions dataframe size: 142893 | Total score for tips: 46112
10Predictions dataframe size: 156198 | Total score for tips: 49909
11Predictions dataframe size: 139985 | Total score for tips: 44138
12Predictions dataframe size: 141062 | Total score for tips: 46360
The above example implements (1) distributed dispatch of tasks to a large number of actors, and (2) processing results from the actors and dispatching new work in a streaming way. It's not too many lines of code for a distributed program, but we can do better!
Ray provides an ActorPool
utility that makes this a lot easier.
Suppose we have our list of actors created. We can wrap it in an ActorPool class as follows:
1from ray.util.actor_pool import ActorPool
2
3actor_pool = ActorPool(actors)
4
Then, to process our data, all we need to do is call actor_pool.map
. Actually, we'll use .map_unordered
for slightly better efficiency as we don't care about the order of results:
1def actor_call(actor, data):
2 return actor.predict.remote(data)
3
4for result in actor_pool.map_unordered(actor_call, input_splits):
5 process_result(result)
6
The above snippet does exactly the same logic as the original loop, just hiding the tedious futures management in a convenient utility class. You can check out the source code of ActorPool
to see the familiar ray.wait
primitive it is using under the hood.\
There are a few unoptimized aspects of the above code to point out. First, we are dispatching file splits one at a time, which may be inefficient if the splits are too small (e.g., on GPUs) — or cause OutOfMemory errors if the splits are too large. Furthermore, you may want to pipeline the task submission (have multiple tasks in flight to an actor at once), and parallelize the data fetching and any necessary preprocessing.
While we could implement these optimizations on top of the example code above, perhaps using Ray tasks to parallelize data fetching, for example, Ray's Datasets libraries have these optimizations built in, as well as other features such as dynamic autoscaling of the actor pool used for the computation.
The upshot or takeaways from above is as follows:
You can build distributed batch inference using Ray Actors and ActorPool.
You can control exactly how Ray is executing your code.
You understand how Ray works under the hood with respect to Ray core primitives such as task, actors, objects, and ray.wait
.
However, all these optimizations force you to implement common performance optimizations yourself, giving you the control yet demanding internal knowledge of Ray.
But there is a better way to instruct Ray what it is that you want done, by using the Ray Data library, without deep knowledge of Ray core’s primitive. We discuss this approach next.
It can be tricky and time-consuming to implement and test optimizations for a distributed program. The goal of Ray Data is to handle common case optimizations for batch inference automatically, including:
Dynamic autoscaling of the actor pool
Automatic batching and pipelining of data
Parallelizing data fetching and preprocessing
As a first step, we can use ray.data.read_parquet
here to load the data as a Ray Dataset:
1ds = ray.data.read_parquet(input_splits)
2
3# -> Dataset(num_blocks=12, num_rows=1710629, schema={vendor_id: string, pickup_at: timestamp[us], dropoff_at: timestamp[us], passenger_count: int8, trip_distance: float, pickup_longitude: float, pickup_latitude: float, rate_code_id: null, store_and_fwd_flag: string, dropoff_longitude: float, dropoff_latitude: float, payment_type: string, fare_amount: float, extra: float, mta_tax: float, tip_amount: float, tolls_amount: float, total_amount: float})
4
We can also do preprocessing on our Dataset using simple transformation APIs like map_batches. Here's an example of shifting the num_passengers
field by 1.0 via a Dataset transform:
1# Define our preprocessing function
2def preprocess(batch):
3 batch["passenger_count"] -= 1.0
4 return batch
5
6ds = ds.map_batches(preprocess)
7
To proceed, let's define our model class to use (note that we define the __call__
method of the class to make it a callable class and specify the target method):
1class CallableCls:
2 def __init__(self, model):
3 self.model = model
4
5 def __call__(self, batch):
6 result = self.model(batch)
7 return result
Finally, we can use the Dataset map_batches()
function to apply our model to our Dataset in parallel. We can specify the batch size to pass to the model, any GPU resources, as well as autoscaling options for the actor pool Datasets is going to use under the hood.
1results = ds.map_batches(
2 CallableCls,
3 num_gpus=0,
4 batch_size=1024,
5 compute=ray.data.ActorPoolStrategy(min_size=1, max_size=5),
6 fn_constructor_args=(model_ref,))
7# -> Dataset(num_blocks=12, num_rows=1710629, schema={score: bool})
Compared to the original version, the Ray Data version has a few advantages. For one, it allows for parallel reading and preprocessing of the source data. Second, Ray Data library manages the autoscaling of the ActorPool
used for inference. And lastly, we only declared what we want done, with a set of declarative key-value arguments, rather than how it should be done, avoiding all the cumbersome code we wrote above to instruct Ray how to parallelize and scale.
Learn more about using Datasets for batch inference in the user guides.
We walked through three ways to implement scalable batch inference in Ray, using Ray’s low-level primitives and Ray Data's high-level expressive API. Each way has its merits. If you want to control and dictate how Ray should execute your batch inference then use Ray tasks and actors or the Actor pools utility. By contrast, if you want Ray to manage your scaling, distribution, and inference at scale, use Ray Data.
These APIs are layered on top of each other—they all are Ray tasks, actors, and objects under the hood. Pick and choose depending on your needs.
The takeaway here is that Ray Core gives you control over how to do something, putting the onus on implementing inference with Ray primitives yourself and understanding how Ray works under the hood, whereas Ray Data offers automatic scaling, expressive, and intuitive APIs to conduct batch inference at scale. Additionally, the latter offers less code and more brevity.
For data scientists and machine learning practitioners who care more about getting the models to scale for batch inference and worry less about underlying primitives and under-the-hood execution details, Ray Data is a desirable option.
A couple of examples in Ray documentation illustrate how you can use Ray Core APIs– tasks, actors or actor pools, and objects.
Other resources demonstrate how to use Ray Data:
We will be giving a talk on Ray Data streaming at our upcoming Ray Summit. Early bird registration ends May 31, 2023. Join us for all things Ray.
[1] Using stateful actors improves efficiency, since we don't need to load and initialize model CPU/GPU state for each batch of data predicted.
Access Anyscale today to see how companies using Anyscale and Ray benefit from rapid time-to-market and faster iterations across the entire AI lifecycle.