Ray Data
Scalable Ray Data automation and integration for distributed data processing pipelines
Category: productivity Source: Orchestra-Research/AI-Research-SKILLsRay Data is a community skill for scalable data processing using the Ray Data library, covering dataset creation, transformations, batch processing, data loading for ML training, and distributed execution for large-scale data pipelines.
What Is This?
Overview
Ray Data provides tools for processing large datasets in parallel across distributed clusters using the Ray runtime. It covers dataset creation that reads data from files, databases, and in-memory sources into distributed datasets, transformations that apply map, filter, and flat_map operations across partitions in parallel, batch processing that groups records for efficient vectorized operations, data loading for ML training that streams processed data to model training loops with prefetching, and distributed execution that scales processing across multiple nodes without code changes. The skill enables data engineers to build scalable data pipelines that handle production-scale workloads efficiently.
Who Should Use This
This skill serves data engineers building ETL pipelines that exceed single-machine capacity, ML engineers preparing training data at scale for distributed model training, and teams needing scalable data processing without Spark infrastructure. It is particularly valuable for organizations already using Ray for distributed compute who want a unified framework.
Why Use It?
Problems It Solves
Processing large datasets on a single machine is limited by memory and CPU capacity. Pandas DataFrames cannot handle datasets that exceed available RAM without chunking logic. Feeding training data to distributed ML training loops requires a streaming interface that standard data loaders do not provide. Setting up Spark clusters for simple distributed processing adds significant infrastructure complexity and operational overhead.
Core Highlights
Dataset builder reads from diverse sources including CSV, Parquet, JSON, and cloud storage into distributed partitions. Transform engine applies parallel map and filter operations across workers. Batch processor groups records for efficient vectorized computation. ML data loader streams processed data to training loops with prefetching to minimize GPU idle time.
How to Use It?
Basic Usage
import ray
ds = ray.data.read_csv(
'data/*.csv')
print(
f'Rows: {ds.count()}')
print(
f'Schema: '
f'{ds.schema()}')
filtered = ds.filter(
lambda row:
row['amount'] > 100)
mapped = filtered.map(
lambda row: {
**row,
'total':
row['amount']
* row['quantity']})
def normalize(batch):
import numpy as np
batch['amount'] = (
(batch['amount']
- batch['amount']
.mean())
/ batch['amount']
.std())
return batch
normalized = (
mapped.map_batches(
normalize,
batch_format=
'pandas'))
normalized.write_parquet(
'output/')
Real-World Examples
import ray
from ray.data\
.preprocessors import (
StandardScaler)
class DataPipeline:
def __init__(
self, path: str
):
self.ds = (
ray.data
.read_parquet(
path))
def preprocess(
self,
feature_cols:
list[str],
label_col: str
):
self.ds = (
self.ds.drop_columns(
[c for c in
self.ds
.columns()
if c not in
feature_cols
+ [label_col]]))
scaler = (
StandardScaler(
columns=
feature_cols))
self.ds = (
scaler
.fit_transform(
self.ds))
return self
def to_torch(
self,
batch_size:
int = 32
):
return (
self.ds
.iter_torch_batches(
batch_size=
batch_size))
Advanced Tips
Use map_batches with batch_format='numpy' or 'pandas' for vectorized operations that are faster than row-by-row map functions. Set the concurrency parameter to control the number of parallel workers for resource-intensive transformations. Use repartition to balance data across partitions before computationally expensive operations. When working with GPU-accelerated transformations, pin workers to specific GPU resources using the num_gpus parameter in map_batches to avoid resource contention.
When to Use It?
Use Cases
Process multi-gigabyte datasets with parallel transformations that scale across a Ray cluster. Build a feature engineering pipeline that feeds preprocessed data directly to distributed model training. Convert large CSV datasets to optimized Parquet format with filtering and schema changes.
Related Topics
Ray Data, distributed processing, data pipelines, ETL, ML data loading, batch processing, and parallel computing.
Important Notes
Requirements
Ray Python package with the data module installed. Sufficient cluster memory to hold working partitions during processing. Source data accessible from all worker nodes in the cluster.
Usage Recommendations
Do: use map_batches for operations that benefit from vectorized execution on pandas or numpy arrays. Read data in columnar formats like Parquet for faster loading with predicate pushdown. Monitor Ray dashboard to identify data skew across partitions and rebalance using repartition when skew is detected.
Don't: load entire datasets into driver memory with take or to_pandas when the data exceeds available RAM. Use row-level map functions for operations that can be vectorized with batch processing. Skip data schema validation since type mismatches cause runtime errors in distributed workers.
Limitations
Ray Data requires a running Ray cluster for distributed execution adding setup overhead for small datasets. Debugging distributed data pipelines is harder than single-machine processing. Some pandas operations are not directly supported in the Ray Data API requiring workarounds.