data_juicer.utils.job.snapshot module#
Processing Snapshot Utility for DataJuicer
This module analyzes the current state of processing based on events.jsonl and DAG structure to provide a comprehensive snapshot of what’s done, what’s not, and checkpointing status.
- class data_juicer.utils.job.snapshot.ProcessingStatus(value)[source]#
Bases:
EnumProcessing status enumeration.
- NOT_STARTED = 'not_started'#
- IN_PROGRESS = 'in_progress'#
- COMPLETED = 'completed'#
- FAILED = 'failed'#
- CHECKPOINTED = 'checkpointed'#
- class data_juicer.utils.job.snapshot.OperationStatus(operation_name: str, operation_idx: int, status: ProcessingStatus, start_time: float | None = None, end_time: float | None = None, duration: float | None = None, input_rows: int | None = None, output_rows: int | None = None, checkpoint_time: float | None = None, error_message: str | None = None)[source]#
Bases:
objectStatus of a single operation.
- operation_name: str#
- operation_idx: int#
- status: ProcessingStatus#
- start_time: float | None = None#
- end_time: float | None = None#
- duration: float | None = None#
- input_rows: int | None = None#
- output_rows: int | None = None#
- checkpoint_time: float | None = None#
- error_message: str | None = None#
- __init__(operation_name: str, operation_idx: int, status: ProcessingStatus, start_time: float | None = None, end_time: float | None = None, duration: float | None = None, input_rows: int | None = None, output_rows: int | None = None, checkpoint_time: float | None = None, error_message: str | None = None) None#
- class data_juicer.utils.job.snapshot.PartitionStatus(partition_id: int, status: ProcessingStatus, sample_count: int | None = None, creation_start_time: float | None = None, creation_end_time: float | None = None, processing_start_time: float | None = None, processing_end_time: float | None = None, current_operation: str | None = None, completed_operations: List[str] = None, failed_operations: List[str] = None, checkpointed_operations: List[str] = None, error_message: str | None = None)[source]#
Bases:
objectStatus of a single partition.
- partition_id: int#
- status: ProcessingStatus#
- sample_count: int | None = None#
- creation_start_time: float | None = None#
- creation_end_time: float | None = None#
- processing_start_time: float | None = None#
- processing_end_time: float | None = None#
- current_operation: str | None = None#
- completed_operations: List[str] = None#
- failed_operations: List[str] = None#
- checkpointed_operations: List[str] = None#
- error_message: str | None = None#
- __init__(partition_id: int, status: ProcessingStatus, sample_count: int | None = None, creation_start_time: float | None = None, creation_end_time: float | None = None, processing_start_time: float | None = None, processing_end_time: float | None = None, current_operation: str | None = None, completed_operations: List[str] = None, failed_operations: List[str] = None, checkpointed_operations: List[str] = None, error_message: str | None = None) None#
- class data_juicer.utils.job.snapshot.JobSnapshot(job_id: str, job_start_time: float | None = None, job_end_time: float | None = None, total_duration: float | None = None, total_partitions: int = 0, completed_partitions: int = 0, failed_partitions: int = 0, in_progress_partitions: int = 0, total_operations: int = 0, completed_operations: int = 0, failed_operations: int = 0, checkpointed_operations: int = 0, partition_statuses: Dict[int, PartitionStatus] = None, operation_statuses: Dict[str, OperationStatus] = None, dag_structure: Dict = None, checkpoint_strategy: str | None = None, checkpoint_frequency: str | None = None, last_checkpoint_time: float | None = None, resumable: bool = False, overall_status: ProcessingStatus = ProcessingStatus.NOT_STARTED)[source]#
Bases:
objectComplete snapshot of job processing status.
- job_id: str#
- job_start_time: float | None = None#
- job_end_time: float | None = None#
- total_duration: float | None = None#
- total_partitions: int = 0#
- completed_partitions: int = 0#
- failed_partitions: int = 0#
- in_progress_partitions: int = 0#
- total_operations: int = 0#
- completed_operations: int = 0#
- failed_operations: int = 0#
- checkpointed_operations: int = 0#
- partition_statuses: Dict[int, PartitionStatus] = None#
- operation_statuses: Dict[str, OperationStatus] = None#
- dag_structure: Dict = None#
- checkpoint_strategy: str | None = None#
- checkpoint_frequency: str | None = None#
- last_checkpoint_time: float | None = None#
- resumable: bool = False#
- overall_status: ProcessingStatus = 'not_started'#
- __init__(job_id: str, job_start_time: float | None = None, job_end_time: float | None = None, total_duration: float | None = None, total_partitions: int = 0, completed_partitions: int = 0, failed_partitions: int = 0, in_progress_partitions: int = 0, total_operations: int = 0, completed_operations: int = 0, failed_operations: int = 0, checkpointed_operations: int = 0, partition_statuses: Dict[int, PartitionStatus] = None, operation_statuses: Dict[str, OperationStatus] = None, dag_structure: Dict = None, checkpoint_strategy: str | None = None, checkpoint_frequency: str | None = None, last_checkpoint_time: float | None = None, resumable: bool = False, overall_status: ProcessingStatus = ProcessingStatus.NOT_STARTED) None#
- class data_juicer.utils.job.snapshot.ProcessingSnapshotAnalyzer(work_dir: str)[source]#
Bases:
objectAnalyzer for processing snapshots.
- analyze_events(events: List[Dict]) Tuple[Dict[int, PartitionStatus], Dict[str, OperationStatus]][source]#
Analyze events to determine processing status.
- determine_overall_status(partition_statuses: Dict[int, PartitionStatus], operation_statuses: Dict[str, OperationStatus]) ProcessingStatus[source]#
Determine overall job status.
- calculate_statistics(partition_statuses: Dict[int, PartitionStatus], operation_statuses: Dict[str, OperationStatus]) Dict[source]#
Calculate processing statistics.
- generate_snapshot() JobSnapshot[source]#
Generate a complete processing snapshot.
- to_json_dict(snapshot: JobSnapshot) Dict[source]#
Convert snapshot to JSON-serializable dictionary with comprehensive progress tracking.
- data_juicer.utils.job.snapshot.create_snapshot(work_dir: str, detailed: bool = False) JobSnapshot[source]#
Create and display a processing snapshot for a work directory.