作业管理#

DataJuicer 提供用于监控和管理处理作业的工具。

处理快照#

从事件日志和 DAG 结构分析作业状态。

# JSON 输出
python -m data_juicer.utils.job.snapshot /path/to/job_dir

# 人类可读输出
python -m data_juicer.utils.job.snapshot /path/to/job_dir --human-readable

输出包括:

  • 作业状态和进度百分比

  • 分区完成计数

  • 操作指标

  • 检查点覆盖率

  • 时间信息

资源感知分区#

系统根据集群资源和数据特征自动优化分区大小。

partition:
  mode: "auto"
  target_size_mb: 256  # 目标分区大小(可配置)

优化器会:

  1. 检测 CPU、内存和 GPU 资源

  2. 采样数据以确定模态和内存使用

  3. 计算目标为配置大小的分区(默认 256MB)

  4. 确定最佳工作节点数量

日志#

日志按作业组织,支持轮转和保留:

{job_dir}/
├── events_{timestamp}.jsonl   # 机器可读事件
├── logs/
│   ├── log.txt                # 主日志
│   ├── log_DEBUG.txt          # 调试日志
│   ├── log_ERROR.txt          # 错误日志
│   └── log_WARNING.txt        # 警告日志
└── job_summary.json           # 摘要(完成时)

配置日志:

from data_juicer.utils.logger_utils import setup_logger

setup_logger(
    save_dir="./outputs",
    filename="log.txt",
    max_log_size_mb=100,
    backup_count=5
)

API 参考#

ProcessingSnapshotAnalyzer#

from data_juicer.utils.job.snapshot import ProcessingSnapshotAnalyzer

analyzer = ProcessingSnapshotAnalyzer(job_dir)
snapshot = analyzer.generate_snapshot()
json_data = analyzer.to_json_dict(snapshot)

ResourceDetector#

from data_juicer.core.executor.partition_size_optimizer import ResourceDetector

local = ResourceDetector.detect_local_resources()
cluster = ResourceDetector.detect_ray_cluster()
workers = ResourceDetector.calculate_optimal_worker_count()

PartitionSizeOptimizer#

from data_juicer.core.executor.partition_size_optimizer import PartitionSizeOptimizer

optimizer = PartitionSizeOptimizer(cfg)
recommendations = optimizer.get_partition_recommendations(dataset, pipeline)

故障排除#

检查作业状态:

python -m data_juicer.utils.job.snapshot /path/to/job

分析事件:

cat /path/to/job/events_*.jsonl | head -20

检查资源:

from data_juicer.core.executor.partition_size_optimizer import ResourceDetector
print(ResourceDetector.detect_local_resources())