data_juicer.utils.job#

Job utilities for DataJuicer.

This module provides utilities for job management, monitoring, and analysis.

class data_juicer.utils.job.JobUtils(job_id: str, work_dir: str = None, base_dir: str = None)[source]#

Bases: object

Common utilities for DataJuicer job operations.

__init__(job_id: str, work_dir: str = None, base_dir: str = None)[source]#

Initialize job utilities.

Parameters:
  • job_id – The job ID to work with

  • work_dir – Work directory that already includes job_id (preferred)

  • base_dir – Base directory containing job outputs (deprecated, use work_dir instead)

load_job_summary() Dict[str, Any] | None[source]#

Load job summary from the work directory.

load_dataset_mapping() Dict[str, Any][source]#

Load dataset mapping information.

load_event_logs() List[Dict[str, Any]][source]#

Load and parse event logs.

extract_process_thread_ids() Dict[str, Set[int]][source]#

Extract process and thread IDs from event logs. Returns a dict with ‘process_ids’ and ‘thread_ids’ sets.

find_processes_by_ids(process_ids: Set[int]) List[Process][source]#

Find running processes by their PIDs.

find_threads_by_ids(thread_ids: Set[int]) List[Thread][source]#

Find running threads by their IDs (if possible).

get_partition_status() Dict[int, Dict[str, Any]][source]#

Get current status of all partitions.

calculate_overall_progress() Dict[str, Any][source]#

Calculate overall job progress.

get_operation_pipeline() List[Dict[str, Any]][source]#

Get the operation pipeline from config.

data_juicer.utils.job.list_running_jobs(base_dir: str = 'outputs/partition-checkpoint-eventlog') List[Dict[str, Any]][source]#

List all DataJuicer jobs and their status.

class data_juicer.utils.job.ProcessingSnapshotAnalyzer(work_dir: str)[source]#

Bases: object

Analyzer for processing snapshots.

__init__(work_dir: str)[source]#

Initialize the analyzer with work directory.

load_events() List[Dict][source]#

Load events from events.jsonl file.

load_dag_plan() Dict[source]#

Load DAG execution plan.

load_job_summary() Dict[source]#

Load job summary if available.

analyze_events(events: List[Dict]) Tuple[Dict[int, PartitionStatus], Dict[str, OperationStatus]][source]#

Analyze events to determine processing status.

determine_overall_status(partition_statuses: Dict[int, PartitionStatus], operation_statuses: Dict[str, OperationStatus]) ProcessingStatus[source]#

Determine overall job status.

calculate_statistics(partition_statuses: Dict[int, PartitionStatus], operation_statuses: Dict[str, OperationStatus]) Dict[source]#

Calculate processing statistics.

generate_snapshot() JobSnapshot[source]#

Generate a complete processing snapshot.

to_json_dict(snapshot: JobSnapshot) Dict[source]#

Convert snapshot to JSON-serializable dictionary with comprehensive progress tracking.

data_juicer.utils.job.create_snapshot(work_dir: str, detailed: bool = False) JobSnapshot[source]#

Create and display a processing snapshot for a work directory.

class data_juicer.utils.job.JobSnapshot(job_id: str, job_start_time: float | None = None, job_end_time: float | None = None, total_duration: float | None = None, total_partitions: int = 0, completed_partitions: int = 0, failed_partitions: int = 0, in_progress_partitions: int = 0, total_operations: int = 0, completed_operations: int = 0, failed_operations: int = 0, checkpointed_operations: int = 0, partition_statuses: Dict[int, PartitionStatus] = None, operation_statuses: Dict[str, OperationStatus] = None, dag_structure: Dict = None, checkpoint_strategy: str | None = None, checkpoint_frequency: str | None = None, last_checkpoint_time: float | None = None, resumable: bool = False, overall_status: ProcessingStatus = ProcessingStatus.NOT_STARTED)[source]#

Bases: object

Complete snapshot of job processing status.

job_id: str#
job_start_time: float | None = None#
job_end_time: float | None = None#
total_duration: float | None = None#
total_partitions: int = 0#
completed_partitions: int = 0#
failed_partitions: int = 0#
in_progress_partitions: int = 0#
total_operations: int = 0#
completed_operations: int = 0#
failed_operations: int = 0#
checkpointed_operations: int = 0#
partition_statuses: Dict[int, PartitionStatus] = None#
operation_statuses: Dict[str, OperationStatus] = None#
dag_structure: Dict = None#
checkpoint_strategy: str | None = None#
checkpoint_frequency: str | None = None#
last_checkpoint_time: float | None = None#
resumable: bool = False#
overall_status: ProcessingStatus = 'not_started'#
__init__(job_id: str, job_start_time: float | None = None, job_end_time: float | None = None, total_duration: float | None = None, total_partitions: int = 0, completed_partitions: int = 0, failed_partitions: int = 0, in_progress_partitions: int = 0, total_operations: int = 0, completed_operations: int = 0, failed_operations: int = 0, checkpointed_operations: int = 0, partition_statuses: Dict[int, PartitionStatus] = None, operation_statuses: Dict[str, OperationStatus] = None, dag_structure: Dict = None, checkpoint_strategy: str | None = None, checkpoint_frequency: str | None = None, last_checkpoint_time: float | None = None, resumable: bool = False, overall_status: ProcessingStatus = ProcessingStatus.NOT_STARTED) None#
class data_juicer.utils.job.ProcessingStatus(value)[source]#

Bases: Enum

Processing status enumeration.

NOT_STARTED = 'not_started'#
IN_PROGRESS = 'in_progress'#
COMPLETED = 'completed'#
FAILED = 'failed'#
CHECKPOINTED = 'checkpointed'#
class data_juicer.utils.job.OperationStatus(operation_name: str, operation_idx: int, status: ProcessingStatus, start_time: float | None = None, end_time: float | None = None, duration: float | None = None, input_rows: int | None = None, output_rows: int | None = None, checkpoint_time: float | None = None, error_message: str | None = None)[source]#

Bases: object

Status of a single operation.

operation_name: str#
operation_idx: int#
status: ProcessingStatus#
start_time: float | None = None#
end_time: float | None = None#
duration: float | None = None#
input_rows: int | None = None#
output_rows: int | None = None#
checkpoint_time: float | None = None#
error_message: str | None = None#
__init__(operation_name: str, operation_idx: int, status: ProcessingStatus, start_time: float | None = None, end_time: float | None = None, duration: float | None = None, input_rows: int | None = None, output_rows: int | None = None, checkpoint_time: float | None = None, error_message: str | None = None) None#
class data_juicer.utils.job.PartitionStatus(partition_id: int, status: ProcessingStatus, sample_count: int | None = None, creation_start_time: float | None = None, creation_end_time: float | None = None, processing_start_time: float | None = None, processing_end_time: float | None = None, current_operation: str | None = None, completed_operations: List[str] = None, failed_operations: List[str] = None, checkpointed_operations: List[str] = None, error_message: str | None = None)[source]#

Bases: object

Status of a single partition.

partition_id: int#
status: ProcessingStatus#
sample_count: int | None = None#
creation_start_time: float | None = None#
creation_end_time: float | None = None#
processing_start_time: float | None = None#
processing_end_time: float | None = None#
current_operation: str | None = None#
completed_operations: List[str] = None#
failed_operations: List[str] = None#
checkpointed_operations: List[str] = None#
error_message: str | None = None#
__init__(partition_id: int, status: ProcessingStatus, sample_count: int | None = None, creation_start_time: float | None = None, creation_end_time: float | None = None, processing_start_time: float | None = None, processing_end_time: float | None = None, current_operation: str | None = None, completed_operations: List[str] = None, failed_operations: List[str] = None, checkpointed_operations: List[str] = None, error_message: str | None = None) None#