data_juicer.utils.job.snapshot module#

Processing Snapshot Utility for DataJuicer

This module analyzes the current state of processing based on events.jsonl and DAG structure to provide a comprehensive snapshot of what's done, what's not, and checkpointing status.

class data_juicer.utils.job.snapshot.ProcessingStatus(value)[源代码]#

基类:Enum

Processing status enumeration.

NOT_STARTED = 'not_started'#
IN_PROGRESS = 'in_progress'#
COMPLETED = 'completed'#
FAILED = 'failed'#
CHECKPOINTED = 'checkpointed'#
class data_juicer.utils.job.snapshot.OperationStatus(operation_name: str, operation_idx: int, status: ProcessingStatus, start_time: float | None = None, end_time: float | None = None, duration: float | None = None, input_rows: int | None = None, output_rows: int | None = None, checkpoint_time: float | None = None, error_message: str | None = None)[源代码]#

基类:object

Status of a single operation.

operation_name: str#
operation_idx: int#
status: ProcessingStatus#
start_time: float | None = None#
end_time: float | None = None#
duration: float | None = None#
input_rows: int | None = None#
output_rows: int | None = None#
checkpoint_time: float | None = None#
error_message: str | None = None#
__init__(operation_name: str, operation_idx: int, status: ProcessingStatus, start_time: float | None = None, end_time: float | None = None, duration: float | None = None, input_rows: int | None = None, output_rows: int | None = None, checkpoint_time: float | None = None, error_message: str | None = None) None#
class data_juicer.utils.job.snapshot.PartitionStatus(partition_id: int, status: ProcessingStatus, sample_count: int | None = None, creation_start_time: float | None = None, creation_end_time: float | None = None, processing_start_time: float | None = None, processing_end_time: float | None = None, current_operation: str | None = None, completed_operations: List[str] = None, failed_operations: List[str] = None, checkpointed_operations: List[str] = None, error_message: str | None = None)[源代码]#

基类:object

Status of a single partition.

partition_id: int#
status: ProcessingStatus#
sample_count: int | None = None#
creation_start_time: float | None = None#
creation_end_time: float | None = None#
processing_start_time: float | None = None#
processing_end_time: float | None = None#
current_operation: str | None = None#
completed_operations: List[str] = None#
failed_operations: List[str] = None#
checkpointed_operations: List[str] = None#
error_message: str | None = None#
__init__(partition_id: int, status: ProcessingStatus, sample_count: int | None = None, creation_start_time: float | None = None, creation_end_time: float | None = None, processing_start_time: float | None = None, processing_end_time: float | None = None, current_operation: str | None = None, completed_operations: List[str] = None, failed_operations: List[str] = None, checkpointed_operations: List[str] = None, error_message: str | None = None) None#
class data_juicer.utils.job.snapshot.JobSnapshot(job_id: str, job_start_time: float | None = None, job_end_time: float | None = None, total_duration: float | None = None, total_partitions: int = 0, completed_partitions: int = 0, failed_partitions: int = 0, in_progress_partitions: int = 0, total_operations: int = 0, completed_operations: int = 0, failed_operations: int = 0, checkpointed_operations: int = 0, partition_statuses: Dict[int, PartitionStatus] = None, operation_statuses: Dict[str, OperationStatus] = None, dag_structure: Dict = None, checkpoint_strategy: str | None = None, checkpoint_frequency: str | None = None, last_checkpoint_time: float | None = None, resumable: bool = False, overall_status: ProcessingStatus = ProcessingStatus.NOT_STARTED)[源代码]#

基类:object

Complete snapshot of job processing status.

job_id: str#
job_start_time: float | None = None#
job_end_time: float | None = None#
total_duration: float | None = None#
total_partitions: int = 0#
completed_partitions: int = 0#
failed_partitions: int = 0#
in_progress_partitions: int = 0#
total_operations: int = 0#
completed_operations: int = 0#
failed_operations: int = 0#
checkpointed_operations: int = 0#
partition_statuses: Dict[int, PartitionStatus] = None#
operation_statuses: Dict[str, OperationStatus] = None#
dag_structure: Dict = None#
checkpoint_strategy: str | None = None#
checkpoint_frequency: str | None = None#
last_checkpoint_time: float | None = None#
resumable: bool = False#
overall_status: ProcessingStatus = 'not_started'#
__init__(job_id: str, job_start_time: float | None = None, job_end_time: float | None = None, total_duration: float | None = None, total_partitions: int = 0, completed_partitions: int = 0, failed_partitions: int = 0, in_progress_partitions: int = 0, total_operations: int = 0, completed_operations: int = 0, failed_operations: int = 0, checkpointed_operations: int = 0, partition_statuses: Dict[int, PartitionStatus] = None, operation_statuses: Dict[str, OperationStatus] = None, dag_structure: Dict = None, checkpoint_strategy: str | None = None, checkpoint_frequency: str | None = None, last_checkpoint_time: float | None = None, resumable: bool = False, overall_status: ProcessingStatus = ProcessingStatus.NOT_STARTED) None#
class data_juicer.utils.job.snapshot.ProcessingSnapshotAnalyzer(work_dir: str)[源代码]#

基类:object

Analyzer for processing snapshots.

__init__(work_dir: str)[源代码]#

Initialize the analyzer with work directory.

load_events() List[Dict][源代码]#

Load events from events.jsonl file.

load_dag_plan() Dict[源代码]#

Load DAG execution plan.

load_job_summary() Dict[源代码]#

Load job summary if available.

analyze_events(events: List[Dict]) Tuple[Dict[int, PartitionStatus], Dict[str, OperationStatus]][源代码]#

Analyze events to determine processing status.

determine_overall_status(partition_statuses: Dict[int, PartitionStatus], operation_statuses: Dict[str, OperationStatus]) ProcessingStatus[源代码]#

Determine overall job status.

calculate_statistics(partition_statuses: Dict[int, PartitionStatus], operation_statuses: Dict[str, OperationStatus]) Dict[源代码]#

Calculate processing statistics.

generate_snapshot() JobSnapshot[源代码]#

Generate a complete processing snapshot.

to_json_dict(snapshot: JobSnapshot) Dict[源代码]#

Convert snapshot to JSON-serializable dictionary with comprehensive progress tracking.

data_juicer.utils.job.snapshot.create_snapshot(work_dir: str, detailed: bool = False) JobSnapshot[源代码]#

Create and display a processing snapshot for a work directory.

data_juicer.utils.job.snapshot.main()[源代码]#

Main function for command-line usage.