分区处理与检查点#

本文档描述 DataJuicer 的容错处理系统,包括分区、检查点和事件日志。

概述#

ray_partitioned 执行器将数据集分割成分区,并使用可配置的检查点进行处理。失败的作业可以从最后一个检查点恢复。

检查点策略:

  • every_n_ops - 每 N 个操作检查点(默认,平衡方案)

  • every_op - 每个操作后检查点(最高容错性,影响性能)

  • manual - 仅在指定操作后检查点(适合已知的耗时操作)

  • disabled - 不检查点(最佳性能)

目录结构#

{work_dir}/{job_id}/
├── job_summary.json              # 作业元数据(完成时创建)
├── events_{timestamp}.jsonl      # 机器可读事件日志
├── dag_execution_plan.json       # DAG 执行计划
├── checkpoints/                  # 检查点数据
├── partitions/                   # 输入分区
├── logs/                         # 人类可读日志
└── metadata/                     # 作业元数据

配置#

分区模式#

自动模式(推荐)- 分析数据和资源以确定最佳分区:

executor_type: ray_partitioned

partition:
  mode: "auto"
  target_size_mb: 256    # 目标分区大小(128、256、512 或 1024)
  size: 5000             # 自动分析失败时的回退值
  max_size_mb: 256       # 回退最大大小

手动模式 - 指定确切的分区数量:

partition:
  mode: "manual"
  num_of_partitions: 8

检查点#

checkpoint:
  enabled: true
  strategy: every_n_ops  # every_n_ops(默认), every_op, manual, disabled
  n_ops: 5               # 默认:每 5 个操作检查点
  op_names:              # 用于 manual 策略 - 在耗时操作后检查点
    - document_deduplicator
    - embedding_mapper

中间存储#

intermediate_storage:
  format: "parquet"              # parquet, arrow, jsonl
  compression: "snappy"          # snappy, gzip, none
  preserve_intermediate_data: true
  retention_policy: "keep_all"   # keep_all, keep_failed_only, cleanup_all

使用方法#

运行作业#

# 自动分区模式
dj-process --config config.yaml --partition.mode auto

# 手动分区模式
dj-process --config config.yaml --partition.mode manual --partition.num_of_partitions 4

# 自定义作业 ID
dj-process --config config.yaml --job_id my_experiment_001

恢复作业#

dj-process --config config.yaml --job_id my_experiment_001

检查点策略#

# 每个操作
dj-process --config config.yaml --checkpoint.strategy every_op

# 每 N 个操作
dj-process --config config.yaml --checkpoint.strategy every_n_ops --checkpoint.n_ops 3

# 手动
dj-process --config config.yaml --checkpoint.strategy manual --checkpoint.op_names op1,op2

自动配置#

在自动模式下,优化器会:

  1. 采样数据集以检测模态(文本、图像、音频、视频、多模态)

  2. 测量每个样本的内存使用

  3. 分析管道复杂性

  4. 计算目标为配置的 target_size_mb 的分区大小

按模态的默认分区大小:

模态

默认大小

最大大小

内存倍数

文本

10000

50000

1.0x

图像

2000

10000

5.0x

音频

1000

4000

8.0x

视频

400

2000

20.0x

多模态

1600

6000

10.0x

作业管理工具#

监控器#

# 显示进度
python -m data_juicer.utils.job.monitor {job_id}

# 详细视图
python -m data_juicer.utils.job.monitor {job_id} --detailed

# 监视模式
python -m data_juicer.utils.job.monitor {job_id} --watch --interval 10
from data_juicer.utils.job.monitor import show_job_progress

data = show_job_progress("job_id", detailed=True)

停止器#

# 优雅停止
python -m data_juicer.utils.job.stopper {job_id}

# 强制停止
python -m data_juicer.utils.job.stopper {job_id} --force

# 列出运行中的作业
python -m data_juicer.utils.job.stopper --list
from data_juicer.utils.job.stopper import stop_job

stop_job("job_id", force=True, timeout=60)

通用工具#

from data_juicer.utils.job.common import JobUtils, list_running_jobs

running_jobs = list_running_jobs()

job_utils = JobUtils("job_id")
summary = job_utils.load_job_summary()
events = job_utils.load_event_logs()

事件类型#

  • job_start, job_complete, job_failed

  • partition_start, partition_complete, partition_failed

  • op_start, op_complete, op_failed

  • checkpoint_save, checkpoint_load

性能考虑#

检查点与 Ray 优化的权衡#

关键洞察:检查点会干扰 Ray 的自动优化。

Ray 通过融合操作和流水线处理数据来优化执行。每个检查点都会强制物化,从而打破优化窗口:

无检查点:          op1 → op2 → op3 → op4 → op5
                    |___________________________|
                         Ray 优化整个窗口

every_op:          op1 | op2 | op3 | op4 | op5
                    每个 | 处物化(5 个屏障)

every_n_ops(5):    op1 → op2 → op3 → op4 → op5 |
                    |_____________________________|
                         Ray 优化全部 5 个操作

检查点成本分析#

成本类型

典型值

检查点写入

~2-5 秒

轻量操作执行

~1-2 秒

耗时操作执行

分钟到小时

对于轻量操作,检查点的成本比失败后重新执行更高。

管道分析示例:

filter(1秒) → mapper(2秒) → deduplicator(300秒) → filter(1秒)

策略              | 开销    | 保护价值
------------------|---------|------------------
every_op          | ~20秒   | 失败时节省 1-304秒
仅在 dedup 后     | ~5秒    | 失败时节省 300秒
disabled          | 0秒     | 重新执行全部

策略建议#

作业时长

建议策略

理由

< 10 分钟

disabled

重新执行成本低

10-60 分钟

every_n_ops (n=5)

平衡保护

> 60 分钟且有耗时操作

manual

仅在耗时操作后检查点

不稳定的基础设施

every_n_ops (n=2-3)

接受开销换取可靠性

操作分类#

耗时操作(建议在这些操作后检查点):

  • *_deduplicator - 全局状态,计算耗时

  • *_embedding_* - 模型推理

  • *_model_* - 模型推理

  • *_vision_* - 图像/视频处理

  • *_audio_* - 音频处理

轻量操作(可跳过检查点):

  • *_filter - 简单过滤

  • clean_* - 文本清理

  • remove_* - 字段移除

存储建议#

  • 事件日志:快速存储(SSD)

  • 检查点:大容量存储

  • 分区:本地存储

分区大小权衡#

  • 较小分区:更好的容错性,更多调度开销

  • 较大分区:更少开销,更粗粒度的恢复

故障排除#

作业恢复失败:

ls -la ./outputs/{work_dir}/{job_id}/job_summary.json
ls -la ./outputs/{work_dir}/{job_id}/checkpoints/

检查 Ray 状态:

ray status

查看日志:

cat ./outputs/{work_dir}/{job_id}/events_*.jsonl
tail -f ./outputs/{work_dir}/{job_id}/logs/*.txt