Ray Data

Ray Data

Scalable Ray Data automation and integration for distributed data processing pipelines

Category: productivity Source: Orchestra-Research/AI-Research-SKILLs

Ray Data is a community skill for scalable data processing using the Ray Data library, covering dataset creation, transformations, batch processing, data loading for ML training, and distributed execution for large-scale data pipelines.

What Is This?

Overview

Ray Data provides tools for processing large datasets in parallel across distributed clusters using the Ray runtime. It covers dataset creation that reads data from files, databases, and in-memory sources into distributed datasets, transformations that apply map, filter, and flat_map operations across partitions in parallel, batch processing that groups records for efficient vectorized operations, data loading for ML training that streams processed data to model training loops with prefetching, and distributed execution that scales processing across multiple nodes without code changes. The skill enables data engineers to build scalable data pipelines that handle production-scale workloads efficiently.

Who Should Use This

This skill serves data engineers building ETL pipelines that exceed single-machine capacity, ML engineers preparing training data at scale for distributed model training, and teams needing scalable data processing without Spark infrastructure. It is particularly valuable for organizations already using Ray for distributed compute who want a unified framework.

Why Use It?

Problems It Solves

Processing large datasets on a single machine is limited by memory and CPU capacity. Pandas DataFrames cannot handle datasets that exceed available RAM without chunking logic. Feeding training data to distributed ML training loops requires a streaming interface that standard data loaders do not provide. Setting up Spark clusters for simple distributed processing adds significant infrastructure complexity and operational overhead.

Core Highlights

Dataset builder reads from diverse sources including CSV, Parquet, JSON, and cloud storage into distributed partitions. Transform engine applies parallel map and filter operations across workers. Batch processor groups records for efficient vectorized computation. ML data loader streams processed data to training loops with prefetching to minimize GPU idle time.

How to Use It?

Basic Usage

import ray

ds = ray.data.read_csv(
  'data/*.csv')
print(
  f'Rows: {ds.count()}')
print(
  f'Schema: '
  f'{ds.schema()}')

filtered = ds.filter(
  lambda row:
    row['amount'] > 100)

mapped = filtered.map(
  lambda row: {
    **row,
    'total':
      row['amount']
      * row['quantity']})

def normalize(batch):
  import numpy as np
  batch['amount'] = (
    (batch['amount']
     - batch['amount']
       .mean())
    / batch['amount']
      .std())
  return batch

normalized = (
  mapped.map_batches(
    normalize,
    batch_format=
      'pandas'))

normalized.write_parquet(
  'output/')

Real-World Examples

import ray
from ray.data\
  .preprocessors import (
    StandardScaler)

class DataPipeline:
  def __init__(
    self, path: str
  ):
    self.ds = (
      ray.data
        .read_parquet(
          path))

  def preprocess(
    self,
    feature_cols:
      list[str],
    label_col: str
  ):
    self.ds = (
      self.ds.drop_columns(
        [c for c in
         self.ds
           .columns()
         if c not in
         feature_cols
         + [label_col]]))
    scaler = (
      StandardScaler(
        columns=
          feature_cols))
    self.ds = (
      scaler
        .fit_transform(
          self.ds))
    return self

  def to_torch(
    self,
    batch_size:
      int = 32
  ):
    return (
      self.ds
        .iter_torch_batches(
          batch_size=
            batch_size))

Advanced Tips

Use map_batches with batch_format='numpy' or 'pandas' for vectorized operations that are faster than row-by-row map functions. Set the concurrency parameter to control the number of parallel workers for resource-intensive transformations. Use repartition to balance data across partitions before computationally expensive operations. When working with GPU-accelerated transformations, pin workers to specific GPU resources using the num_gpus parameter in map_batches to avoid resource contention.

When to Use It?

Use Cases

Process multi-gigabyte datasets with parallel transformations that scale across a Ray cluster. Build a feature engineering pipeline that feeds preprocessed data directly to distributed model training. Convert large CSV datasets to optimized Parquet format with filtering and schema changes.

Related Topics

Ray Data, distributed processing, data pipelines, ETL, ML data loading, batch processing, and parallel computing.

Important Notes

Requirements

Ray Python package with the data module installed. Sufficient cluster memory to hold working partitions during processing. Source data accessible from all worker nodes in the cluster.

Usage Recommendations

Do: use map_batches for operations that benefit from vectorized execution on pandas or numpy arrays. Read data in columnar formats like Parquet for faster loading with predicate pushdown. Monitor Ray dashboard to identify data skew across partitions and rebalance using repartition when skew is detected.

Don't: load entire datasets into driver memory with take or to_pandas when the data exceeds available RAM. Use row-level map functions for operations that can be vectorized with batch processing. Skip data schema validation since type mismatches cause runtime errors in distributed workers.

Limitations

Ray Data requires a running Ray cluster for distributed execution adding setup overhead for small datasets. Debugging distributed data pipelines is harder than single-machine processing. Some pandas operations are not directly supported in the Ray Data API requiring workarounds.