data_juicer.core.executor.event_logging_mixin module#

Event Logging Mixin for Data-Juicer Executors

This module provides comprehensive event logging capabilities that can be used by any executor (default, ray, partitioned, etc.) to track operations, performance, and errors in real-time.

Features: 1. Real-time event logging with configurable levels 2. Event filtering and querying 3. Performance metrics tracking 4. Error tracking with stack traces 5. Status reporting and monitoring 6. Log rotation and cleanup

class data_juicer.core.executor.event_logging_mixin.EventType(value)[source]#

Bases: Enum

Types of events that can be logged.

JOB_START = 'job_start'#
JOB_COMPLETE = 'job_complete'#
JOB_FAILED = 'job_failed'#
JOB_RESTART = 'job_restart'#
PARTITION_START = 'partition_start'#
PARTITION_COMPLETE = 'partition_complete'#
PARTITION_FAILED = 'partition_failed'#
PARTITION_RESUME = 'partition_resume'#
OP_START = 'op_start'#
OP_COMPLETE = 'op_complete'#
OP_FAILED = 'op_failed'#
CHECKPOINT_SAVE = 'checkpoint_save'#
CHECKPOINT_LOAD = 'checkpoint_load'#
PROCESSING_START = 'processing_start'#
PROCESSING_COMPLETE = 'processing_complete'#
PROCESSING_ERROR = 'processing_error'#
DAG_BUILD_START = 'dag_build_start'#
DAG_BUILD_COMPLETE = 'dag_build_complete'#
DAG_NODE_READY = 'dag_node_ready'#
DAG_NODE_START = 'dag_node_start'#
DAG_NODE_COMPLETE = 'dag_node_complete'#
DAG_NODE_FAILED = 'dag_node_failed'#
DAG_PARALLEL_GROUP_START = 'dag_parallel_group_start'#
DAG_PARALLEL_GROUP_COMPLETE = 'dag_parallel_group_complete'#
DAG_EXECUTION_PLAN_SAVED = 'dag_execution_plan_saved'#
DAG_EXECUTION_PLAN_LOADED = 'dag_execution_plan_loaded'#
class data_juicer.core.executor.event_logging_mixin.Event(event_type: EventType, timestamp: float, message: str, event_id: str | None = None, job_id: str | None = None, partition_id: int | None = None, operation_name: str | None = None, operation_idx: int | None = None, status: str | None = None, duration: float | None = None, error_message: str | None = None, stack_trace: str | None = None, retry_count: int | None = None, checkpoint_path: str | None = None, op_args: Dict[str, Any] | None = None, input_rows: int | None = None, output_rows: int | None = None, output_path: str | None = None, partition_meta: Dict[str, Any] | None = None, config: Dict[str, Any] | None = None, metadata: Dict[str, Any] | None = None, total_partitions: int | None = None, successful_partitions: int | None = None, failed_partitions: int | None = None, job_duration: float | None = None, completion_time: float | None = None, failure_time: float | None = None, error_type: str | None = None, process_id: int | None = None, thread_id: int | None = None)[source]#

Bases: object

Event data structure.

event_type: EventType#
timestamp: float#
message: str#
event_id: str | None = None#
job_id: str | None = None#
partition_id: int | None = None#
operation_name: str | None = None#
operation_idx: int | None = None#
status: str | None = None#
duration: float | None = None#
error_message: str | None = None#
stack_trace: str | None = None#
retry_count: int | None = None#
checkpoint_path: str | None = None#
op_args: Dict[str, Any] | None = None#
input_rows: int | None = None#
output_rows: int | None = None#
output_path: str | None = None#
partition_meta: Dict[str, Any] | None = None#
config: Dict[str, Any] | None = None#
metadata: Dict[str, Any] | None = None#
total_partitions: int | None = None#
successful_partitions: int | None = None#
failed_partitions: int | None = None#
job_duration: float | None = None#
completion_time: float | None = None#
failure_time: float | None = None#
error_type: str | None = None#
process_id: int | None = None#
thread_id: int | None = None#
__init__(event_type: EventType, timestamp: float, message: str, event_id: str | None = None, job_id: str | None = None, partition_id: int | None = None, operation_name: str | None = None, operation_idx: int | None = None, status: str | None = None, duration: float | None = None, error_message: str | None = None, stack_trace: str | None = None, retry_count: int | None = None, checkpoint_path: str | None = None, op_args: Dict[str, Any] | None = None, input_rows: int | None = None, output_rows: int | None = None, output_path: str | None = None, partition_meta: Dict[str, Any] | None = None, config: Dict[str, Any] | None = None, metadata: Dict[str, Any] | None = None, total_partitions: int | None = None, successful_partitions: int | None = None, failed_partitions: int | None = None, job_duration: float | None = None, completion_time: float | None = None, failure_time: float | None = None, error_type: str | None = None, process_id: int | None = None, thread_id: int | None = None) None#
class data_juicer.core.executor.event_logging_mixin.EventLogger(log_dir: str, job_id: str | None = None, work_dir: str | None = None)[source]#

Bases: object

Event logging system with real-time capabilities and JSONL event log for resumability.

__init__(log_dir: str, job_id: str | None = None, work_dir: str | None = None)[source]#
events: deque#
log_event(event: Event)[source]#

Log an event (to memory, loguru, and JSONL for resumability).

find_latest_events_file(work_dir: str) Path | None[source]#

Find the latest events file in the work directory.

check_job_completion(events_file: Path) bool[source]#

Check if job is already completed by looking for job_complete event.

get_events(event_type: EventType | None = None, partition_id: int | None = None, operation_name: str | None = None, start_time: float | None = None, end_time: float | None = None, limit: int | None = None) List[Event][source]#

Get events with optional filtering.

generate_status_report() str[source]#

Generate a comprehensive status report.

monitor_events(event_type: EventType | None = None) Generator[Event, None, None][source]#

Monitor events in real-time.

classmethod list_available_jobs(work_dir: str) List[Dict[str, Any]][source]#

List available jobs for resumption from a work directory.

class data_juicer.core.executor.event_logging_mixin.EventLoggingMixin(*args, **kwargs)[source]#

Bases: object

Mixin to add event logging capabilities to any executor.

__init__(*args, **kwargs)[source]#

Initialize the mixin.

log_job_start(config, total_partitions)[source]#

Log job start with detailed configuration.

log_job_complete(duration, output_path=None)[source]#

Log job completion with performance metrics.

log_job_failed(error_message, duration)[source]#

Log job failure with error details.

log_partition_start(partition_id, partition_meta)[source]#

Log partition start with detailed metadata.

log_partition_complete(partition_id, duration, output_path, success=True, error=None)[source]#

Log partition completion with performance metrics.

log_partition_failed(partition_id, error_message, retry_count)[source]#

Log partition failure with retry information.

log_op_start(partition_id, operation_name, operation_idx, op_args, **kwargs)[source]#

Log operation start with detailed arguments.

log_op_complete(partition_id, operation_name, operation_idx, duration, checkpoint_path, input_rows, output_rows, **kwargs)[source]#

Log operation completion with detailed performance metrics.

log_op_failed(partition_id, operation_name, operation_idx, error_message, retry_count, **kwargs)[source]#

Log operation failure with error details.

log_checkpoint_save(partition_id, operation_name, operation_idx, checkpoint_path)[source]#

Log checkpoint save with file information.

log_checkpoint_load(partition_id, operation_name, operation_idx, checkpoint_path)[source]#

Log checkpoint load with file information.

log_dag_build_start(ast_info: Dict[str, Any])[source]#

Log DAG build start with AST information.

log_dag_build_complete(dag_info: Dict[str, Any])[source]#

Log DAG build completion with execution plan information.

log_dag_node_ready(node_id: str, node_info: Dict[str, Any])[source]#

Log when a DAG node becomes ready for execution.

log_dag_node_start(node_id: str, node_info: Dict[str, Any])[source]#

Log when a DAG node starts execution.

log_dag_node_complete(node_id: str, node_info: Dict[str, Any], duration: float)[source]#

Log when a DAG node completes execution.

log_dag_node_failed(node_id: str, node_info: Dict[str, Any], error_message: str, duration: float = 0)[source]#

Log when a DAG node fails execution.

log_dag_parallel_group_start(group_id: str, group_info: Dict[str, Any])[source]#

Log when a parallel group starts execution.

log_dag_parallel_group_complete(group_id: str, group_info: Dict[str, Any], duration: float)[source]#

Log when a parallel group completes execution.

log_dag_execution_plan_saved(plan_path: str, plan_info: Dict[str, Any])[source]#

Log when DAG execution plan is saved.

log_dag_execution_plan_loaded(plan_path: str, plan_info: Dict[str, Any])[source]#

Log when DAG execution plan is loaded.

log_job_restart(restart_reason: str, original_start_time: float, resume_partitions: List[int], resume_from_operation: int, checkpoint_paths: List[str])[source]#

Log when a job is restarted after interruption.

log_partition_resume(partition_id: int, resume_operation: int, checkpoint_path: str, resume_reason: str)[source]#

Log when a partition is resumed from a checkpoint.

get_events(**kwargs) List[Event][source]#

Get events with optional filtering.

generate_status_report() str[source]#

Generate status report.

monitor_events(event_type: EventType | None = None) Generator[Event, None, None][source]#

Monitor events in real-time.

analyze_resumption_state(job_id: str) Dict[str, Any][source]#

Analyze event history to determine resumption state and generate resumption plan.

Parameters:

job_id – The job ID to analyze

Returns:

Dictionary containing resumption analysis and plan