data_juicer.utils.ckpt_utils module#
- class data_juicer.utils.ckpt_utils.CheckpointManagerBase(ckpt_dir: str)[源代码]#
基类:
ABCBase class for checkpoint managers.
Provides common functionality for managing checkpoint directories and defines the interface that checkpoint managers should implement.
- __init__(ckpt_dir: str)[源代码]#
Initialize base checkpoint manager.
- 参数:
ckpt_dir -- Directory to save and load checkpoints
- abstractmethod save_checkpoint(dataset: Any, **kwargs) str[源代码]#
Save a dataset checkpoint.
- 参数:
dataset -- Dataset to save
kwargs -- Additional arguments specific to the implementation
- 返回:
Path to saved checkpoint
- class data_juicer.utils.ckpt_utils.CheckpointManager(ckpt_dir, original_process_list, num_proc=1)[源代码]#
-
This class is used to save the latest version of dataset to checkpoint directory or load it from checkpoint directory, a bit like cache management Rerun the same config will reload the checkpoint and skip ops before it.
If any args of operator in process list is changed, all ops will be rerun from the beginning.
- __init__(ckpt_dir, original_process_list, num_proc=1)[源代码]#
Initialization method.
- 参数:
ckpt_dir -- path to save and load checkpoint
original_process_list -- process list in config
num_proc -- number of process workers when saving dataset
- get_left_process_list()[源代码]#
Get left process list of ops for processing dataset, when checkpoint is available, remove some ops from process list, otherwise keep it unchanged.
- 返回:
process list of left ops
- check_ckpt()[源代码]#
Check if checkpoint is available.
- 返回:
True when checkpoint is available, else False
- record(op_cfg: dict)[源代码]#
Save op name and args to op record, which is used to compare with the process list from config to decide if a checkpoint is available.
- check_ops_to_skip()[源代码]#
Check which ops need to be skipped in the process list.
If op record list from checkpoint are the same as the prefix part of process list, then skip these ops and start processing from the checkpoint. Otherwise, process the original dataset from scratch.
- 返回:
whether to skip some ops or not
- save_ckpt(ds)[源代码]#
Save dataset to checkpoint directory and dump processed ops list. Alias for save_checkpoint for backward compatibility.
- 参数:
ds -- input dataset to save
- save_checkpoint(ds, **kwargs)[源代码]#
Save dataset to checkpoint directory and dump processed ops list.
- 参数:
ds -- input dataset to save
kwargs -- Additional arguments (not used, kept for interface compatibility)
- 返回:
Path to checkpoint directory
- class data_juicer.utils.ckpt_utils.CheckpointStrategy(value)[源代码]#
基类:
EnumCheckpoint strategies for controlling when to create checkpoints.
- EVERY_OP = 'every_op'#
- EVERY_N_OPS = 'every_n_ops'#
- MANUAL = 'manual'#
- DISABLED = 'disabled'#
- class data_juicer.utils.ckpt_utils.RayCheckpointManager(ckpt_dir: str, checkpoint_enabled: bool = True, checkpoint_strategy: CheckpointStrategy = CheckpointStrategy.EVERY_OP, checkpoint_n_ops: int = 1, checkpoint_op_names: List[str] | None = None, event_logger=None)[源代码]#
-
Checkpoint manager for Ray Data with per-partition checkpointing support.
This class manages checkpoints for Ray Data datasets using Parquet format, supporting per-partition checkpointing and various checkpoint strategies.
- __init__(ckpt_dir: str, checkpoint_enabled: bool = True, checkpoint_strategy: CheckpointStrategy = CheckpointStrategy.EVERY_OP, checkpoint_n_ops: int = 1, checkpoint_op_names: List[str] | None = None, event_logger=None)[源代码]#
Initialize Ray checkpoint manager.
- 参数:
ckpt_dir -- Directory to save and load checkpoints
checkpoint_enabled -- Whether checkpointing is enabled
checkpoint_strategy -- Strategy for when to create checkpoints
checkpoint_n_ops -- Number of operations between checkpoints (for EVERY_N_OPS strategy)
checkpoint_op_names -- List of operation names to checkpoint (for MANUAL strategy)
event_logger -- Optional event logger for checkpoint events
- resolve_checkpoint_filename(op_idx: int, partition_id: int) str[源代码]#
Resolve checkpoint filename using consistent format.
- should_checkpoint(op_idx: int, op_name: str) bool[源代码]#
Determine if checkpoint should be created based on configuration strategy.
- save_checkpoint(dataset: Any, op_idx: int, op_name: str | None = None, partition_id: int = 0, cfg: Any | None = None) str[源代码]#
Save dataset checkpoint to parquet format.
- 参数:
dataset -- RayDataset or ray.data.Dataset to save
op_idx -- Operation index
op_name -- Operation name (optional)
partition_id -- Partition ID
cfg -- Optional config for RayDataset wrapper
- 返回:
Path to saved checkpoint
- load_checkpoint(op_idx: int, op_name: str | None = None, partition_id: int = 0, cfg: Any | None = None) Any | None[源代码]#
Load dataset checkpoint from parquet format.
- 参数:
op_idx -- Operation index
op_name -- Operation name (optional)
partition_id -- Partition ID
cfg -- Optional config for RayDataset wrapper
- 返回:
RayDataset or None if checkpoint doesn't exist