data_juicer.utils.ckpt_utils module#

class data_juicer.utils.ckpt_utils.CheckpointManagerBase(ckpt_dir: str)[source]#

Bases: ABC

Base class for checkpoint managers.

Provides common functionality for managing checkpoint directories and defines the interface that checkpoint managers should implement.

__init__(ckpt_dir: str)[source]#

Initialize base checkpoint manager.

Parameters:

ckpt_dir โ€“ Directory to save and load checkpoints

abstractmethod save_checkpoint(dataset: Any, **kwargs) str[source]#

Save a dataset checkpoint.

Parameters:
  • dataset โ€“ Dataset to save

  • kwargs โ€“ Additional arguments specific to the implementation

Returns:

Path to saved checkpoint

abstractmethod load_checkpoint(**kwargs) Any | None[source]#

Load a dataset checkpoint.

Parameters:

kwargs โ€“ Arguments specific to the implementation (e.g., op_idx, partition_id)

Returns:

Loaded dataset or None if checkpoint doesnโ€™t exist

checkpoint_exists(checkpoint_path: str) bool[source]#

Check if a checkpoint file/directory exists.

Parameters:

checkpoint_path โ€“ Path to checkpoint

Returns:

True if checkpoint exists, False otherwise

class data_juicer.utils.ckpt_utils.CheckpointManager(ckpt_dir, original_process_list, num_proc=1)[source]#

Bases: CheckpointManagerBase

This class is used to save the latest version of dataset to checkpoint directory or load it from checkpoint directory, a bit like cache management Rerun the same config will reload the checkpoint and skip ops before it.

If any args of operator in process list is changed, all ops will be rerun from the beginning.

__init__(ckpt_dir, original_process_list, num_proc=1)[source]#

Initialization method.

Parameters:
  • ckpt_dir โ€“ path to save and load checkpoint

  • original_process_list โ€“ process list in config

  • num_proc โ€“ number of process workers when saving dataset

get_left_process_list()[source]#

Get left process list of ops for processing dataset, when checkpoint is available, remove some ops from process list, otherwise keep it unchanged.

Returns:

process list of left ops

check_ckpt()[source]#

Check if checkpoint is available.

Returns:

True when checkpoint is available, else False

record(op_cfg: dict)[source]#

Save op name and args to op record, which is used to compare with the process list from config to decide if a checkpoint is available.

check_ops_to_skip()[source]#

Check which ops need to be skipped in the process list.

If op record list from checkpoint are the same as the prefix part of process list, then skip these ops and start processing from the checkpoint. Otherwise, process the original dataset from scratch.

Returns:

whether to skip some ops or not

save_ckpt(ds)[source]#

Save dataset to checkpoint directory and dump processed ops list. Alias for save_checkpoint for backward compatibility.

Parameters:

ds โ€“ input dataset to save

save_checkpoint(ds, **kwargs)[source]#

Save dataset to checkpoint directory and dump processed ops list.

Parameters:
  • ds โ€“ input dataset to save

  • kwargs โ€“ Additional arguments (not used, kept for interface compatibility)

Returns:

Path to checkpoint directory

load_ckpt()[source]#

Load dataset from a checkpoint file. Alias for load_checkpoint for backward compatibility.

Returns:

a dataset stored in checkpoint file.

load_checkpoint(**kwargs)[source]#

Load dataset from a checkpoint file.

Parameters:

kwargs โ€“ Additional arguments (not used, kept for interface compatibility)

Returns:

a dataset stored in checkpoint file.

class data_juicer.utils.ckpt_utils.CheckpointStrategy(value)[source]#

Bases: Enum

Checkpoint strategies for controlling when to create checkpoints.

EVERY_OP = 'every_op'#
EVERY_N_OPS = 'every_n_ops'#
MANUAL = 'manual'#
DISABLED = 'disabled'#
class data_juicer.utils.ckpt_utils.RayCheckpointManager(ckpt_dir: str, checkpoint_enabled: bool = True, checkpoint_strategy: CheckpointStrategy = CheckpointStrategy.EVERY_OP, checkpoint_n_ops: int = 1, checkpoint_op_names: List[str] | None = None, event_logger=None)[source]#

Bases: CheckpointManagerBase

Checkpoint manager for Ray Data with per-partition checkpointing support.

This class manages checkpoints for Ray Data datasets using Parquet format, supporting per-partition checkpointing and various checkpoint strategies.

__init__(ckpt_dir: str, checkpoint_enabled: bool = True, checkpoint_strategy: CheckpointStrategy = CheckpointStrategy.EVERY_OP, checkpoint_n_ops: int = 1, checkpoint_op_names: List[str] | None = None, event_logger=None)[source]#

Initialize Ray checkpoint manager.

Parameters:
  • ckpt_dir โ€“ Directory to save and load checkpoints

  • checkpoint_enabled โ€“ Whether checkpointing is enabled

  • checkpoint_strategy โ€“ Strategy for when to create checkpoints

  • checkpoint_n_ops โ€“ Number of operations between checkpoints (for EVERY_N_OPS strategy)

  • checkpoint_op_names โ€“ List of operation names to checkpoint (for MANUAL strategy)

  • event_logger โ€“ Optional event logger for checkpoint events

resolve_checkpoint_filename(op_idx: int, partition_id: int) str[source]#

Resolve checkpoint filename using consistent format.

should_checkpoint(op_idx: int, op_name: str) bool[source]#

Determine if checkpoint should be created based on configuration strategy.

save_checkpoint(dataset: Any, op_idx: int, op_name: str | None = None, partition_id: int = 0, cfg: Any | None = None) str[source]#

Save dataset checkpoint to parquet format.

Parameters:
  • dataset โ€“ RayDataset or ray.data.Dataset to save

  • op_idx โ€“ Operation index

  • op_name โ€“ Operation name (optional)

  • partition_id โ€“ Partition ID

  • cfg โ€“ Optional config for RayDataset wrapper

Returns:

Path to saved checkpoint

load_checkpoint(op_idx: int, op_name: str | None = None, partition_id: int = 0, cfg: Any | None = None) Any | None[source]#

Load dataset checkpoint from parquet format.

Parameters:
  • op_idx โ€“ Operation index

  • op_name โ€“ Operation name (optional)

  • partition_id โ€“ Partition ID

  • cfg โ€“ Optional config for RayDataset wrapper

Returns:

RayDataset or None if checkpoint doesnโ€™t exist

find_latest_checkpoint(partition_id: int = 0) Tuple[int, str, str] | None[source]#

Find the latest checkpoint for a partition.

Parameters:

partition_id โ€“ Partition ID

Returns:

Tuple of (op_idx, op_name, checkpoint_path) or None if no checkpoint found

group_operations_for_checkpointing(ops: List[Any]) List[Tuple[int, int, List[Any]]][source]#

Group operations based on checkpoint strategy.

Parameters:

ops โ€“ List of operations

Returns:

List of (start_idx, end_idx, group_ops) tuples