data_juicer.core.executor.event_logging_mixin module#
Event Logging Mixin for Data-Juicer Executors
This module provides comprehensive event logging capabilities that can be used by any executor (default, ray, partitioned, etc.) to track operations, performance, and errors in real-time.
Features: 1. Real-time event logging with configurable levels 2. Event filtering and querying 3. Performance metrics tracking 4. Error tracking with stack traces 5. Status reporting and monitoring 6. Log rotation and cleanup
- class data_juicer.core.executor.event_logging_mixin.EventType(value)[source]#
Bases:
EnumTypes of events that can be logged.
- JOB_START = 'job_start'#
- JOB_COMPLETE = 'job_complete'#
- JOB_FAILED = 'job_failed'#
- JOB_RESTART = 'job_restart'#
- PARTITION_START = 'partition_start'#
- PARTITION_COMPLETE = 'partition_complete'#
- PARTITION_FAILED = 'partition_failed'#
- PARTITION_RESUME = 'partition_resume'#
- OP_START = 'op_start'#
- OP_COMPLETE = 'op_complete'#
- OP_FAILED = 'op_failed'#
- CHECKPOINT_SAVE = 'checkpoint_save'#
- CHECKPOINT_LOAD = 'checkpoint_load'#
- PROCESSING_START = 'processing_start'#
- PROCESSING_COMPLETE = 'processing_complete'#
- PROCESSING_ERROR = 'processing_error'#
- DAG_BUILD_START = 'dag_build_start'#
- DAG_BUILD_COMPLETE = 'dag_build_complete'#
- DAG_NODE_READY = 'dag_node_ready'#
- DAG_NODE_START = 'dag_node_start'#
- DAG_NODE_COMPLETE = 'dag_node_complete'#
- DAG_NODE_FAILED = 'dag_node_failed'#
- DAG_PARALLEL_GROUP_START = 'dag_parallel_group_start'#
- DAG_PARALLEL_GROUP_COMPLETE = 'dag_parallel_group_complete'#
- DAG_EXECUTION_PLAN_SAVED = 'dag_execution_plan_saved'#
- DAG_EXECUTION_PLAN_LOADED = 'dag_execution_plan_loaded'#
- class data_juicer.core.executor.event_logging_mixin.Event(event_type: EventType, timestamp: float, message: str, event_id: str | None = None, job_id: str | None = None, partition_id: int | None = None, operation_name: str | None = None, operation_idx: int | None = None, status: str | None = None, duration: float | None = None, error_message: str | None = None, stack_trace: str | None = None, retry_count: int | None = None, checkpoint_path: str | None = None, op_args: Dict[str, Any] | None = None, input_rows: int | None = None, output_rows: int | None = None, output_path: str | None = None, partition_meta: Dict[str, Any] | None = None, config: Dict[str, Any] | None = None, metadata: Dict[str, Any] | None = None, total_partitions: int | None = None, successful_partitions: int | None = None, failed_partitions: int | None = None, job_duration: float | None = None, completion_time: float | None = None, failure_time: float | None = None, error_type: str | None = None, process_id: int | None = None, thread_id: int | None = None)[source]#
Bases:
objectEvent data structure.
- timestamp: float#
- message: str#
- event_id: str | None = None#
- job_id: str | None = None#
- partition_id: int | None = None#
- operation_name: str | None = None#
- operation_idx: int | None = None#
- status: str | None = None#
- duration: float | None = None#
- error_message: str | None = None#
- stack_trace: str | None = None#
- retry_count: int | None = None#
- checkpoint_path: str | None = None#
- op_args: Dict[str, Any] | None = None#
- input_rows: int | None = None#
- output_rows: int | None = None#
- output_path: str | None = None#
- partition_meta: Dict[str, Any] | None = None#
- config: Dict[str, Any] | None = None#
- metadata: Dict[str, Any] | None = None#
- total_partitions: int | None = None#
- successful_partitions: int | None = None#
- failed_partitions: int | None = None#
- job_duration: float | None = None#
- completion_time: float | None = None#
- failure_time: float | None = None#
- error_type: str | None = None#
- process_id: int | None = None#
- thread_id: int | None = None#
- __init__(event_type: EventType, timestamp: float, message: str, event_id: str | None = None, job_id: str | None = None, partition_id: int | None = None, operation_name: str | None = None, operation_idx: int | None = None, status: str | None = None, duration: float | None = None, error_message: str | None = None, stack_trace: str | None = None, retry_count: int | None = None, checkpoint_path: str | None = None, op_args: Dict[str, Any] | None = None, input_rows: int | None = None, output_rows: int | None = None, output_path: str | None = None, partition_meta: Dict[str, Any] | None = None, config: Dict[str, Any] | None = None, metadata: Dict[str, Any] | None = None, total_partitions: int | None = None, successful_partitions: int | None = None, failed_partitions: int | None = None, job_duration: float | None = None, completion_time: float | None = None, failure_time: float | None = None, error_type: str | None = None, process_id: int | None = None, thread_id: int | None = None) None#
- class data_juicer.core.executor.event_logging_mixin.EventLogger(log_dir: str, job_id: str | None = None, work_dir: str | None = None)[source]#
Bases:
objectEvent logging system with real-time capabilities and JSONL event log for resumability.
- events: deque#
- find_latest_events_file(work_dir: str) Path | None[source]#
Find the latest events file in the work directory.
- check_job_completion(events_file: Path) bool[source]#
Check if job is already completed by looking for job_complete event.
- get_events(event_type: EventType | None = None, partition_id: int | None = None, operation_name: str | None = None, start_time: float | None = None, end_time: float | None = None, limit: int | None = None) List[Event][source]#
Get events with optional filtering.
- class data_juicer.core.executor.event_logging_mixin.EventLoggingMixin(*args, **kwargs)[source]#
Bases:
objectMixin to add event logging capabilities to any executor.
- log_partition_start(partition_id, partition_meta)[source]#
Log partition start with detailed metadata.
- log_partition_complete(partition_id, duration, output_path, success=True, error=None)[source]#
Log partition completion with performance metrics.
- log_partition_failed(partition_id, error_message, retry_count)[source]#
Log partition failure with retry information.
- log_op_start(partition_id, operation_name, operation_idx, op_args, **kwargs)[source]#
Log operation start with detailed arguments.
- log_op_complete(partition_id, operation_name, operation_idx, duration, checkpoint_path, input_rows, output_rows, **kwargs)[source]#
Log operation completion with detailed performance metrics.
- log_op_failed(partition_id, operation_name, operation_idx, error_message, retry_count, **kwargs)[source]#
Log operation failure with error details.
- log_checkpoint_save(partition_id, operation_name, operation_idx, checkpoint_path)[source]#
Log checkpoint save with file information.
- log_checkpoint_load(partition_id, operation_name, operation_idx, checkpoint_path)[source]#
Log checkpoint load with file information.
- log_dag_build_complete(dag_info: Dict[str, Any])[source]#
Log DAG build completion with execution plan information.
- log_dag_node_ready(node_id: str, node_info: Dict[str, Any])[source]#
Log when a DAG node becomes ready for execution.
- log_dag_node_start(node_id: str, node_info: Dict[str, Any])[source]#
Log when a DAG node starts execution.
- log_dag_node_complete(node_id: str, node_info: Dict[str, Any], duration: float)[source]#
Log when a DAG node completes execution.
- log_dag_node_failed(node_id: str, node_info: Dict[str, Any], error_message: str, duration: float = 0)[source]#
Log when a DAG node fails execution.
- log_dag_parallel_group_start(group_id: str, group_info: Dict[str, Any])[source]#
Log when a parallel group starts execution.
- log_dag_parallel_group_complete(group_id: str, group_info: Dict[str, Any], duration: float)[source]#
Log when a parallel group completes execution.
- log_dag_execution_plan_saved(plan_path: str, plan_info: Dict[str, Any])[source]#
Log when DAG execution plan is saved.
- log_dag_execution_plan_loaded(plan_path: str, plan_info: Dict[str, Any])[source]#
Log when DAG execution plan is loaded.
- log_job_restart(restart_reason: str, original_start_time: float, resume_partitions: List[int], resume_from_operation: int, checkpoint_paths: List[str])[source]#
Log when a job is restarted after interruption.
- log_partition_resume(partition_id: int, resume_operation: int, checkpoint_path: str, resume_reason: str)[source]#
Log when a partition is resumed from a checkpoint.