Scaling Embedding Generation Pipelines From Pandas to Ray Data

By Marwan Sarieddine   
image3

LinkIntroduction

In this blog post, we explore scaling a pipeline that generates text embeddings using Ray Data. We focus on the ease of migrating from a pandas-based approach to a scalable Ray Data solution that offers significant performance improvements.

Embedding generation is a common task in natural language processing, where we convert text data into a numerical format for various downstream applications, such as text classification, clustering, or information retrieval. Embeddings gained popularity with the release of numerous large language models (LLMs) and the growing use of Retrieval-Augmented Generation (RAG) architectures.

Working locally, pandas is often the go-to library for data manipulation in Python. However, as data volumes grow, pandas-based solutions can quickly become a bottleneck, limiting the scalability and performance of the embedding generation process.

This is where Ray Data comes in.

Ray Data is a distributed data processing framework that scales your data pipelines across a heterogeneous cluster of machines, streaming data from disk to CPUs and GPUs with ease. Ray Data provides a high-level API for building scalable data processing pipelines, facilitating the transition from a single-node pandas-based solution to a distributed, scalable solution with minimal code changes.

Throughout this post, we walk through transitioning from a pandas-based implementation to a scalable Ray Data solution. We start with a simple pandas pipeline, migrate it to a naive Ray Data version, and finally optimize it using Ray Actors and accelerators. By the end, you’ll see how easy it is to scale up your embedding generation pipelines using Ray Data, without getting bogged down in the underlying implementation details.

Let’s get started!

LinkInitial Setup

Before diving into the code, let’s set up the environment by installing and importing the required modules. We’ll use the following libraries:

1from typing import Optional, Union
2from pathlib import Path
3
4import numpy as np
5import pandas as pd
6import ray
7import torch
8from langchain_text_splitters import RecursiveCharacterTextSplitter
9from sentence_transformers import SentenceTransformer

LinkAn overview of the embeddings pipeline

Before implementing the pipeline, let’s review the steps involved in generating embeddings from text data. The pipeline consists of the following stages:

  • Loading the data: Read the text data from disk.

  • Processing the documents into chunks: Split the text into smaller chunks for embedding generation.

  • Generating embeddings from the chunks: Encode the text chunks into numerical embeddings using a pre-trained model.

  • Saving the embeddings: Store the embeddings and associated text data to disk.

Here’s is a diagram of the pipeline:

image2

LinkStarting out with a pandas pipeline

Let’s examine the initial pipeline for generating embeddings from text data using pandas. This will help us understand the limitations of this approach and set the stage for migration to a more scalable solution using Ray Data.

The key components of the pandas-based pipeline are:

1. chunk_text: This function takes a block of text and splits it into smaller chunks, based on a specified chunk size. This is a common preprocessing step before generating embeddings.

1def chunk_text(
2    text: str, section_url: str, page_url: str, chunk_size_in_words: int
3) -> list[dict[str, str]]:
4    """Chunk the text into smaller pieces given a chunk size using a recursive splitter."""
5    splitter = RecursiveCharacterTextSplitter(
6        chunk_size=chunk_size_in_words,
7        length_function=lambda x: len(x.split()),
8        chunk_overlap=0,
9    )
10
11    chunks = []
12    for chunk in splitter.split_text(text):
13        chunks.append(
14            {
15                "text": chunk,
16                "section_url": section_url,
17                "page_url": page_url,
18            }
19        )
20    return chunks

The chunk_text function is then run on each row in the dataframe to produce a list of text chunks. Note that the choice of chunk size is a “hyperparameter” that can be tuned based on the specific use case. As a guideline, chunk sizes should be small enough to fit within the context window of the embedding model and to capture a semantically meaningful unit of text but large enough to avoid excessive chunk generation and loss of context.

1def process_documents_into_chunks(df: pd.DataFrame) -> pd.DataFrame:
2    """Process the documents into chunks."""
3    chunk_size = 128
4    words_to_tokens = 1.2
5    chunk_size_in_words = int(chunk_size // words_to_tokens)
6    chunks = []
7    for _, row in df.iterrows():
8        chunks.extend(
9            chunk_text(
10                text=row["text"],
11                section_url=row["section_url"],
12                page_url=row["page_url"],
13                chunk_size_in_words=chunk_size_in_words,
14            )
15        )
16
17    return pd.DataFrame(chunks)

2. generate_embeddings_from_chunks: This function takes a pandas DataFrame containing text chunks, loads the SentenceTransformer model, and generates embeddings. Note that we are running on our local machine, using the CPU as the device.

1def generate_embeddings_from_chunks(df: pd.DataFrame) -> pd.DataFrame:
2    """Generate embeddings from chunks using a SentenceTransformer model."""
3    model_name = "thenlper/gte-large"
4    device = "cpu"
5    model = load_model(model_name=model_name, device=device)
6    embeddings = model.encode(df["text"].tolist())
7    df["embeddings"] = embeddings.tolist()
8    return df

Where load_model loads a SentenceTransformer model. It takes a model name and a device (“cpu” or “gpu”) as input.

1def load_model(model_name: str, device: str) -> SentenceTransformer:
2    """Load the SentenceTransformer model."""
3    return SentenceTransformer(model_name, device=device)

3. sample_pipeline: This function integrates all steps, reading in a sample of the data, processing the documents into chunks, generating the embeddings, and saving the output to a Parquet file.

1def sample_pipeline():
2    """Run a pipeline on a sample/subset of the data using pandas."""
3    input_path = str(DATA_DIR / "small_sample" / "sample-input.json")
4    df = pd.read_json(input_path, lines=True)
5    df = process_documents_into_chunks(df)
6    df = generate_embeddings_from_chunks(df)
7    output_path = str(DATA_DIR / "small_sample" / "sample-output.parquet")
8    df.to_parquet(output_path)

Here is our sample pandas pipeline visualized:

image4

While this pandas-based pipeline works for a small sample of data, it suffers from significant limitations:

  1. Lack of scalability: Pandas is not designed for large-scale, distributed data processing. As the data grows, the pipeline will become increasingly slow and memory-intensive.

  2. Inability to leverage accelerators efficiently: The pipeline runs on the CPU, which can be slow for large-scale embedding generation. It would be better to efficiently leverage GPUs or other accelerators to speed up the process, ideally keeping the GPUs fully utilized by constantly feeding them data, which is not possible with pandas given its sequential execution.

To address these limitations, we’ll migrate the pipeline to use Ray Data, which will allow us to leverage distributed processing to significantly improve the performance and scalability of our embedding generation pipeline.

LinkA Naive Migration from pandas to Ray Data

Let’s examine the end-to-end pipeline for generating embeddings using a naive Ray Data implementation. This will help us understand the key components of the Ray Data pipeline and how they compare to the pandas-based approach.

1def naive_scalable_pipeline(limit: Optional[int]) -> ray.data.Dataset:
2    """Run the scalable pipeline."""
3    input_dirs = str(DATA_DIR / "full_scale" / "02_sections")
4    ds = ray.data.read_json(input_dirs)
5    ds = scalable_chunk(ds)
6    if limit:
7        ds = ds.limit(limit)
8    ds = naive_scalable_embed(ds)
9    output_path = str(DATA_DIR / "full_scale" / "03_embeddings")
10    ds.write_parquet(output_path)
11    return ds

Here is the visual representation of the naive Ray Data pipeline:

image1

Firstly, instead of pd.read_json, Ray uses ray.data.read_json, which has a similar syntax except that Ray Data adopts lazy execution and will not read any data into memory until needed.

If we look at the last step of the pipeline, we see that write_parquet is analogous to df.to_parquet except the IO will be distributed across an entire cluster of machines, and data is streamed to disk instead of waiting for all of it to be available.

scalable_chunk relies on the same chunk_text function, but we apply it as a transformation using flat_map to our Ray Dataset, similar to how we would use df.apply or df.map, except pandas does not have a built-in flat_map row-wise operation. Ray Data will parallelize the computation in a fully distributed fashion, whereas in pandas, a custom function called element/row-wise will run serially.

1def scalable_chunk(ds: ray.data.Dataset) -> ray.data.Dataset:
2    """Process the documents into chunks using Ray Data."""
3
4    def chunk_row(row: dict[str, str]) -> list[dict[str, str]]:
5        chunk_size = 128
6        words_to_tokens = 1.2
7        num_tokens = int(chunk_size // words_to_tokens)
8        return chunk_text(
9            text=row["text"],
10            section_url=row["section_url"],
11            page_url=row["page_url"],
12            chunk_size_in_words=num_tokens,
13        )
14
15    ds = ds.flat_map(chunk_row)
16    return ds

Finally, we have the naive_scalable_embed function. We take the same embed_batch and apply it using map_batches, given model.encode is a vectorizable operation we want to pass the data in batches. We run this pipeline directly on our local machine, which doesn’t have any accelerators, which is why we set device="cpu".

1def naive_scalable_embed(ds: ray.data.Dataset) -> ray.data.Dataset:
2    """Generate embeddings from the chunks using a SentenceTransformer model."""
3
4    def embed_batch(batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
5        model_name = "thenlper/gte-large"
6        device = "cpu"
7        text = batch["text"].tolist()
8        model = load_model(model_name=model_name, device=device)
9        embeddings = model.encode(text, batch_size=len(text))
10        batch["embeddings"] = embeddings.tolist()
11        return batch
12
13    ds = ds.map_batches(embed_batch)
14    return ds

We then execute the pipeline to achieve two main goals: 

  1. Ensure we replicate the same output we got from pandas.

  2. Establish a baseline performance of running Ray locally fully on a CPU.

1start_time = time.time()
2ds = naive_scalable_pipeline(limit=100)
3end_time = time.time()
4print(f"Naive Pipeline Time taken: {end_time - start_time}s")

The output will show the time taken (approximately 71 seconds) for the pipeline and provide some performance metrics for each of the operators in the pipeline.

While this naive Ray Data implementation is a step in the right direction, there is work to be done. This is because we’re not leveraging any accelerators (GPUs) to speed up the embedding generation.

In the next section, we’ll explore how to further optimize the pipeline.

LinkScaling our Embeddings on a Heterogeneous Cluster

The key change we want to make is to apply our embed_batch transformation on a GPU. We also want a pool of processes with the embedding model already loaded to avoid any overhead initializing the model for each batch (i.e., reading from a cache).

Here’s how we can implement this improved and scalable embed_batch function:

1def improved_scalable_embed(
2    ds: ray.data.Dataset, batch_size: int, num_gpus: int, num_cpus: int
3) -> ray.data.Dataset:
4
5    class EmbedBatch:
6        def __init__(self, model_name: str):
7            """Initialize the state of the worker process."""
8            self.model = SentenceTransformer(model_name, device="cuda" if num_gpus > 0 else "cpu")
9
10        def __call__(self, batch):
11            """Generate embeddings from the chunks using a SentenceTransformer model."""
12            text = batch["text"].tolist()
13            embeddings = self.model.encode(text, batch_size=len(text))
14            batch["embeddings"] = embeddings.tolist()
15            return batch
16
17    ds = ds.map_batches(
18        EmbedBatch,
19        fn_constructor_kwargs={
20            "model_name": "thenlper/gte-large",
21        },
22        concurrency=max(num_gpus, num_cpus),
23        batch_size=batch_size,
24        num_gpus=1 if num_gpus > 0 else None,
25        num_cpus=1 if num_cpus > 0 else None,
26    )
27
28    return ds

Here’s our scalable pipeline visualized:

image3

Note that we ran this pipeline on Anyscale Workspaces which provides an IDE for developing and running Ray applications along with a fully managed and autoscaling Ray cluster. This allows us to easily scale our pipeline to a cluster of machines with GPUs and CPUs.

In this improved pipeline, we defined an EmbedBatch class as a Stateful Transform. This class is responsible for loading the model and generating embeddings for each batch of data. Note how it’s just a python class, nothing special.

The improved_scalable_embed function then applies this stateful EmbedBatch transformation to the Ray Dataset, leveraging the map_batches operation. With map_batches, we can specify:

  • num_gpus: the number of GPUs for each process, allowing us to take advantage of available accelerators and further improve performance. 

  • batch_size: the batch size to control the size of the data chunks processed in parallel.

  • concurrency: the number of processes to launch, which can be adjusted based on the available resources.

LinkAssessing the Performance

Now that we’ve implemented the improved Ray Data pipeline, let’s take a closer look at the performance improvements we’ve achieved.

To analyze the performance of each operator in the pipeline, we can use the ds.stats() method provided by Ray Data. This gives us detailed insights into the execution time, resource usage, and throughput of each step in the pipeline.

The output from ds.stats() provides very detailed information for each operator in the pipeline. We focus on the difference in throughput for the embed_batch step:

Pipeline Step

Throughput (rows/s)

Naive (embed_batch)

1.4 rows/s

Improved (EmbedBatch)

14.3 rows/s

We also show the overall time taken to process the 100 rows of data:

Pipeline

Time (s)

Naive

71

Improved

7

Our improved and scalable pipeline ran with a 10x improvement in performance with minimal code changes.

LinkConclusion

In this blog post, we’ve explored scaling up a pipeline that generates text embeddings using Ray Data and Sentence Transformers. The key takeaways are:

  1. Ease of migration from Pandas to Ray Data: We demonstrated that transitioning from a pandas-based pipeline to a Ray Data-based pipeline is straightforward. The similarities in the code make the migration intuitive, allowing you to leverage the power of Ray Data without a significant rewrite.

  2. Significant performance improvements: The improved Ray Data pipeline delivered a 10x performance improvement over the naive Ray Data implementation, and unlocked the potential to distribute the workload across a cluster of machines with GPUs and CPUs compared to running pandas on a single machine.

If you’re interested in a seamless experience with Ray Data, consider using Managed Ray on Anyscale. The Anyscale platform provides a fully managed Ray service, allowing you to focus on your application rather than the underlying infrastructure.

Ready to try Anyscale?

Access Anyscale today to see how companies using Anyscale and Ray benefit from rapid time-to-market and faster iterations across the entire AI lifecycle.