作业管理#
DataJuicer 提供用于监控和管理处理作业的工具。
处理快照#
从事件日志和 DAG 结构分析作业状态。
# JSON 输出
python -m data_juicer.utils.job.snapshot /path/to/job_dir
# 人类可读输出
python -m data_juicer.utils.job.snapshot /path/to/job_dir --human-readable
输出包括:
作业状态和进度百分比
分区完成计数
操作指标
检查点覆盖率
时间信息
资源感知分区#
系统根据集群资源和数据特征自动优化分区大小。
partition:
mode: "auto"
target_size_mb: 256 # 目标分区大小(可配置)
优化器会:
检测 CPU、内存和 GPU 资源
采样数据以确定模态和内存使用
计算目标为配置大小的分区(默认 256MB)
确定最佳工作节点数量
日志#
日志按作业组织,支持轮转和保留:
{job_dir}/
├── events_{timestamp}.jsonl # 机器可读事件
├── logs/
│ ├── log.txt # 主日志
│ ├── log_DEBUG.txt # 调试日志
│ ├── log_ERROR.txt # 错误日志
│ └── log_WARNING.txt # 警告日志
└── job_summary.json # 摘要(完成时)
配置日志:
from data_juicer.utils.logger_utils import setup_logger
setup_logger(
save_dir="./outputs",
filename="log.txt",
max_log_size_mb=100,
backup_count=5
)
API 参考#
ProcessingSnapshotAnalyzer#
from data_juicer.utils.job.snapshot import ProcessingSnapshotAnalyzer
analyzer = ProcessingSnapshotAnalyzer(job_dir)
snapshot = analyzer.generate_snapshot()
json_data = analyzer.to_json_dict(snapshot)
ResourceDetector#
from data_juicer.core.executor.partition_size_optimizer import ResourceDetector
local = ResourceDetector.detect_local_resources()
cluster = ResourceDetector.detect_ray_cluster()
workers = ResourceDetector.calculate_optimal_worker_count()
PartitionSizeOptimizer#
from data_juicer.core.executor.partition_size_optimizer import PartitionSizeOptimizer
optimizer = PartitionSizeOptimizer(cfg)
recommendations = optimizer.get_partition_recommendations(dataset, pipeline)
故障排除#
检查作业状态:
python -m data_juicer.utils.job.snapshot /path/to/job
分析事件:
cat /path/to/job/events_*.jsonl | head -20
检查资源:
from data_juicer.core.executor.partition_size_optimizer import ResourceDetector
print(ResourceDetector.detect_local_resources())