Partitioned Processing with Checkpointing#

This document describes DataJuicerโ€™s fault-tolerant processing system with partitioning, checkpointing, and event logging.

Overview#

The ray_partitioned executor splits datasets into partitions and processes them with configurable checkpointing. Failed jobs can resume from the last checkpoint.

Checkpointing strategies:

  • every_n_ops - checkpoint every N operations (default, balanced)

  • every_op - checkpoint after every operation (max protection, impacts performance)

  • manual - checkpoint only after specified operations (best for known expensive ops)

  • disabled - no checkpointing (best performance)

Directory Structure#

{work_dir}/{job_id}/
โ”œโ”€โ”€ job_summary.json              # Job metadata (created on completion)
โ”œโ”€โ”€ events_{timestamp}.jsonl      # Machine-readable event log
โ”œโ”€โ”€ dag_execution_plan.json       # DAG execution plan
โ”œโ”€โ”€ checkpoints/                  # Checkpoint data
โ”œโ”€โ”€ partitions/                   # Input partitions
โ”œโ”€โ”€ logs/                         # Human-readable logs
โ””โ”€โ”€ metadata/                     # Job metadata

Configuration#

Partition Modes#

Auto mode (recommended) - analyzes data and resources to determine optimal partitioning:

executor_type: ray_partitioned

partition:
  mode: "auto"
  target_size_mb: 256    # Target partition size (128, 256, 512, or 1024)
  size: 5000             # Fallback if auto-analysis fails
  max_size_mb: 256       # Fallback max size

Manual mode - specify exact partition count:

partition:
  mode: "manual"
  num_of_partitions: 8

Checkpointing#

checkpoint:
  enabled: true
  strategy: every_n_ops  # every_n_ops (default), every_op, manual, disabled
  n_ops: 5               # Default: checkpoint every 5 operations
  op_names:              # For manual strategy - checkpoint after expensive ops
    - document_deduplicator
    - embedding_mapper

Intermediate Storage#

intermediate_storage:
  format: "parquet"              # parquet, arrow, jsonl
  compression: "snappy"          # snappy, gzip, none
  preserve_intermediate_data: true
  retention_policy: "keep_all"   # keep_all, keep_failed_only, cleanup_all

Usage#

Running Jobs#

# Auto partition mode
dj-process --config config.yaml --partition.mode auto

# Manual partition mode
dj-process --config config.yaml --partition.mode manual --partition.num_of_partitions 4

# With custom job ID
dj-process --config config.yaml --job_id my_experiment_001

Resuming Jobs#

dj-process --config config.yaml --job_id my_experiment_001

Checkpoint Strategies#

# Every operation
dj-process --config config.yaml --checkpoint.strategy every_op

# Every N operations
dj-process --config config.yaml --checkpoint.strategy every_n_ops --checkpoint.n_ops 3

# Manual
dj-process --config config.yaml --checkpoint.strategy manual --checkpoint.op_names op1,op2

Auto-Configuration#

In auto mode, the optimizer:

  1. Samples the dataset to detect modality (text, image, audio, video, multimodal)

  2. Measures memory usage per sample

  3. Analyzes pipeline complexity

  4. Calculates partition size targeting the configured target_size_mb

Default partition sizes by modality:

Modality

Default Size

Max Size

Memory Multiplier

Text

10000

50000

1.0x

Image

2000

10000

5.0x

Audio

1000

4000

8.0x

Video

400

2000

20.0x

Multimodal

1600

6000

10.0x

Job Management Utilities#

Monitor#

# Show progress
python -m data_juicer.utils.job.monitor {job_id}

# Detailed view
python -m data_juicer.utils.job.monitor {job_id} --detailed

# Watch mode
python -m data_juicer.utils.job.monitor {job_id} --watch --interval 10
from data_juicer.utils.job.monitor import show_job_progress

data = show_job_progress("job_id", detailed=True)

Stopper#

# Graceful stop
python -m data_juicer.utils.job.stopper {job_id}

# Force stop
python -m data_juicer.utils.job.stopper {job_id} --force

# List running jobs
python -m data_juicer.utils.job.stopper --list
from data_juicer.utils.job.stopper import stop_job

stop_job("job_id", force=True, timeout=60)

Common Utilities#

from data_juicer.utils.job.common import JobUtils, list_running_jobs

running_jobs = list_running_jobs()

job_utils = JobUtils("job_id")
summary = job_utils.load_job_summary()
events = job_utils.load_event_logs()

Event Types#

  • job_start, job_complete, job_failed

  • partition_start, partition_complete, partition_failed

  • op_start, op_complete, op_failed

  • checkpoint_save, checkpoint_load

Performance Considerations#

Checkpoint vs Ray Optimization Trade-off#

Key insight: Checkpointing interferes with Rayโ€™s automatic optimization.

Ray optimizes execution by fusing operations together and pipelining data. Each checkpoint forces materialization, which breaks the optimization window:

Without checkpoints:     op1 โ†’ op2 โ†’ op3 โ†’ op4 โ†’ op5
                         |___________________________|
                              Ray optimizes entire window

With every_op:           op1 | op2 | op3 | op4 | op5
                         materialize at each | (5 barriers)

With every_n_ops(5):     op1 โ†’ op2 โ†’ op3 โ†’ op4 โ†’ op5 |
                         |_____________________________|
                              Ray optimizes all 5 ops

Checkpoint Cost Analysis#

Cost Type

Typical Value

Checkpoint write

~2-5 seconds

Cheap op execution

~1-2 seconds

Expensive op execution

minutes to hours

For cheap operations, checkpointing costs MORE than re-running on failure.

Example pipeline analysis:

filter(1s) โ†’ mapper(2s) โ†’ deduplicator(300s) โ†’ filter(1s)

Strategy         | Overhead  | Protection Value
-----------------|-----------|------------------
every_op         | ~20s      | Save 1-304s on failure
after dedup only | ~5s       | Save 300s on failure
disabled         | 0s        | Re-run everything

Strategy Recommendations#

Job Duration

Recommended Strategy

Rationale

< 10 min

disabled

Re-running is cheap

10-60 min

every_n_ops (n=5)

Balanced protection

> 60 min with expensive ops

manual

Checkpoint after expensive ops only

Unstable infrastructure

every_n_ops (n=2-3)

Accept overhead for reliability

Operation Categories#

Expensive operations (checkpoint after these):

  • *_deduplicator - Global state, expensive computation

  • *_embedding_* - Model inference

  • *_model_* - Model inference

  • *_vision_* - Image/video processing

  • *_audio_* - Audio processing

Cheap operations (skip checkpointing):

  • *_filter - Simple filtering

  • clean_* - Text cleaning

  • remove_* - Field removal

Storage Recommendations#

  • Event logs: fast storage (SSD)

  • Checkpoints: large capacity storage

  • Partitions: local storage

Partition Sizing Trade-offs#

  • Smaller partitions: better fault tolerance, more scheduling overhead

  • Larger partitions: less overhead, coarser recovery granularity

Troubleshooting#

Job resumption fails:

ls -la ./outputs/{work_dir}/{job_id}/job_summary.json
ls -la ./outputs/{work_dir}/{job_id}/checkpoints/

Check Ray status:

ray status

View logs:

cat ./outputs/{work_dir}/{job_id}/events_*.jsonl
tail -f ./outputs/{work_dir}/{job_id}/logs/*.txt