data_juicer.utils.job#
Job utilities for DataJuicer.
This module provides utilities for job management, monitoring, and analysis.
- class data_juicer.utils.job.JobUtils(job_id: str, work_dir: str = None, base_dir: str = None)[source]#
Bases:
objectCommon utilities for DataJuicer job operations.
- __init__(job_id: str, work_dir: str = None, base_dir: str = None)[source]#
Initialize job utilities.
- Parameters:
job_id – The job ID to work with
work_dir – Work directory that already includes job_id (preferred)
base_dir – Base directory containing job outputs (deprecated, use work_dir instead)
- extract_process_thread_ids() Dict[str, Set[int]][source]#
Extract process and thread IDs from event logs. Returns a dict with ‘process_ids’ and ‘thread_ids’ sets.
- find_processes_by_ids(process_ids: Set[int]) List[Process][source]#
Find running processes by their PIDs.
- data_juicer.utils.job.list_running_jobs(base_dir: str = 'outputs/partition-checkpoint-eventlog') List[Dict[str, Any]][source]#
List all DataJuicer jobs and their status.
- class data_juicer.utils.job.ProcessingSnapshotAnalyzer(work_dir: str)[source]#
Bases:
objectAnalyzer for processing snapshots.
- analyze_events(events: List[Dict]) Tuple[Dict[int, PartitionStatus], Dict[str, OperationStatus]][source]#
Analyze events to determine processing status.
- determine_overall_status(partition_statuses: Dict[int, PartitionStatus], operation_statuses: Dict[str, OperationStatus]) ProcessingStatus[source]#
Determine overall job status.
- calculate_statistics(partition_statuses: Dict[int, PartitionStatus], operation_statuses: Dict[str, OperationStatus]) Dict[source]#
Calculate processing statistics.
- generate_snapshot() JobSnapshot[source]#
Generate a complete processing snapshot.
- to_json_dict(snapshot: JobSnapshot) Dict[source]#
Convert snapshot to JSON-serializable dictionary with comprehensive progress tracking.
- data_juicer.utils.job.create_snapshot(work_dir: str, detailed: bool = False) JobSnapshot[source]#
Create and display a processing snapshot for a work directory.
- class data_juicer.utils.job.JobSnapshot(job_id: str, job_start_time: float | None = None, job_end_time: float | None = None, total_duration: float | None = None, total_partitions: int = 0, completed_partitions: int = 0, failed_partitions: int = 0, in_progress_partitions: int = 0, total_operations: int = 0, completed_operations: int = 0, failed_operations: int = 0, checkpointed_operations: int = 0, partition_statuses: Dict[int, PartitionStatus] = None, operation_statuses: Dict[str, OperationStatus] = None, dag_structure: Dict = None, checkpoint_strategy: str | None = None, checkpoint_frequency: str | None = None, last_checkpoint_time: float | None = None, resumable: bool = False, overall_status: ProcessingStatus = ProcessingStatus.NOT_STARTED)[source]#
Bases:
objectComplete snapshot of job processing status.
- job_id: str#
- job_start_time: float | None = None#
- job_end_time: float | None = None#
- total_duration: float | None = None#
- total_partitions: int = 0#
- completed_partitions: int = 0#
- failed_partitions: int = 0#
- in_progress_partitions: int = 0#
- total_operations: int = 0#
- completed_operations: int = 0#
- failed_operations: int = 0#
- checkpointed_operations: int = 0#
- partition_statuses: Dict[int, PartitionStatus] = None#
- operation_statuses: Dict[str, OperationStatus] = None#
- dag_structure: Dict = None#
- checkpoint_strategy: str | None = None#
- checkpoint_frequency: str | None = None#
- last_checkpoint_time: float | None = None#
- resumable: bool = False#
- overall_status: ProcessingStatus = 'not_started'#
- __init__(job_id: str, job_start_time: float | None = None, job_end_time: float | None = None, total_duration: float | None = None, total_partitions: int = 0, completed_partitions: int = 0, failed_partitions: int = 0, in_progress_partitions: int = 0, total_operations: int = 0, completed_operations: int = 0, failed_operations: int = 0, checkpointed_operations: int = 0, partition_statuses: Dict[int, PartitionStatus] = None, operation_statuses: Dict[str, OperationStatus] = None, dag_structure: Dict = None, checkpoint_strategy: str | None = None, checkpoint_frequency: str | None = None, last_checkpoint_time: float | None = None, resumable: bool = False, overall_status: ProcessingStatus = ProcessingStatus.NOT_STARTED) None#
- class data_juicer.utils.job.ProcessingStatus(value)[source]#
Bases:
EnumProcessing status enumeration.
- NOT_STARTED = 'not_started'#
- IN_PROGRESS = 'in_progress'#
- COMPLETED = 'completed'#
- FAILED = 'failed'#
- CHECKPOINTED = 'checkpointed'#
- class data_juicer.utils.job.OperationStatus(operation_name: str, operation_idx: int, status: ProcessingStatus, start_time: float | None = None, end_time: float | None = None, duration: float | None = None, input_rows: int | None = None, output_rows: int | None = None, checkpoint_time: float | None = None, error_message: str | None = None)[source]#
Bases:
objectStatus of a single operation.
- operation_name: str#
- operation_idx: int#
- status: ProcessingStatus#
- start_time: float | None = None#
- end_time: float | None = None#
- duration: float | None = None#
- input_rows: int | None = None#
- output_rows: int | None = None#
- checkpoint_time: float | None = None#
- error_message: str | None = None#
- __init__(operation_name: str, operation_idx: int, status: ProcessingStatus, start_time: float | None = None, end_time: float | None = None, duration: float | None = None, input_rows: int | None = None, output_rows: int | None = None, checkpoint_time: float | None = None, error_message: str | None = None) None#
- class data_juicer.utils.job.PartitionStatus(partition_id: int, status: ProcessingStatus, sample_count: int | None = None, creation_start_time: float | None = None, creation_end_time: float | None = None, processing_start_time: float | None = None, processing_end_time: float | None = None, current_operation: str | None = None, completed_operations: List[str] = None, failed_operations: List[str] = None, checkpointed_operations: List[str] = None, error_message: str | None = None)[source]#
Bases:
objectStatus of a single partition.
- partition_id: int#
- status: ProcessingStatus#
- sample_count: int | None = None#
- creation_start_time: float | None = None#
- creation_end_time: float | None = None#
- processing_start_time: float | None = None#
- processing_end_time: float | None = None#
- current_operation: str | None = None#
- completed_operations: List[str] = None#
- failed_operations: List[str] = None#
- checkpointed_operations: List[str] = None#
- error_message: str | None = None#
- __init__(partition_id: int, status: ProcessingStatus, sample_count: int | None = None, creation_start_time: float | None = None, creation_end_time: float | None = None, processing_start_time: float | None = None, processing_end_time: float | None = None, current_operation: str | None = None, completed_operations: List[str] = None, failed_operations: List[str] = None, checkpointed_operations: List[str] = None, error_message: str | None = None) None#