data_juicer.core.executor.event_logging_mixin module#
Event Logging Mixin for Data-Juicer Executors
This module provides comprehensive event logging capabilities that can be used by any executor (default, ray, partitioned, etc.) to track operations, performance, and errors in real-time.
Features: 1. Real-time event logging with configurable levels 2. Event filtering and querying 3. Performance metrics tracking 4. Error tracking with stack traces 5. Status reporting and monitoring 6. Log rotation and cleanup
- class data_juicer.core.executor.event_logging_mixin.EventType(value)[源代码]#
基类:
EnumTypes of events that can be logged.
- JOB_START = 'job_start'#
- JOB_COMPLETE = 'job_complete'#
- JOB_FAILED = 'job_failed'#
- JOB_RESTART = 'job_restart'#
- PARTITION_START = 'partition_start'#
- PARTITION_COMPLETE = 'partition_complete'#
- PARTITION_FAILED = 'partition_failed'#
- PARTITION_RESUME = 'partition_resume'#
- OP_START = 'op_start'#
- OP_COMPLETE = 'op_complete'#
- OP_FAILED = 'op_failed'#
- CHECKPOINT_SAVE = 'checkpoint_save'#
- CHECKPOINT_LOAD = 'checkpoint_load'#
- PROCESSING_START = 'processing_start'#
- PROCESSING_COMPLETE = 'processing_complete'#
- PROCESSING_ERROR = 'processing_error'#
- DAG_BUILD_START = 'dag_build_start'#
- DAG_BUILD_COMPLETE = 'dag_build_complete'#
- DAG_NODE_READY = 'dag_node_ready'#
- DAG_NODE_START = 'dag_node_start'#
- DAG_NODE_COMPLETE = 'dag_node_complete'#
- DAG_NODE_FAILED = 'dag_node_failed'#
- DAG_PARALLEL_GROUP_START = 'dag_parallel_group_start'#
- DAG_PARALLEL_GROUP_COMPLETE = 'dag_parallel_group_complete'#
- DAG_EXECUTION_PLAN_SAVED = 'dag_execution_plan_saved'#
- DAG_EXECUTION_PLAN_LOADED = 'dag_execution_plan_loaded'#
- class data_juicer.core.executor.event_logging_mixin.Event(event_type: EventType, timestamp: float, message: str, event_id: str | None = None, job_id: str | None = None, partition_id: int | None = None, operation_name: str | None = None, operation_idx: int | None = None, status: str | None = None, duration: float | None = None, error_message: str | None = None, stack_trace: str | None = None, retry_count: int | None = None, checkpoint_path: str | None = None, op_args: Dict[str, Any] | None = None, input_rows: int | None = None, output_rows: int | None = None, output_path: str | None = None, partition_meta: Dict[str, Any] | None = None, config: Dict[str, Any] | None = None, metadata: Dict[str, Any] | None = None, total_partitions: int | None = None, successful_partitions: int | None = None, failed_partitions: int | None = None, job_duration: float | None = None, completion_time: float | None = None, failure_time: float | None = None, error_type: str | None = None, process_id: int | None = None, thread_id: int | None = None)[源代码]#
基类:
objectEvent data structure.
- timestamp: float#
- message: str#
- event_id: str | None = None#
- job_id: str | None = None#
- partition_id: int | None = None#
- operation_name: str | None = None#
- operation_idx: int | None = None#
- status: str | None = None#
- duration: float | None = None#
- error_message: str | None = None#
- stack_trace: str | None = None#
- retry_count: int | None = None#
- checkpoint_path: str | None = None#
- op_args: Dict[str, Any] | None = None#
- input_rows: int | None = None#
- output_rows: int | None = None#
- output_path: str | None = None#
- partition_meta: Dict[str, Any] | None = None#
- config: Dict[str, Any] | None = None#
- metadata: Dict[str, Any] | None = None#
- total_partitions: int | None = None#
- successful_partitions: int | None = None#
- failed_partitions: int | None = None#
- job_duration: float | None = None#
- completion_time: float | None = None#
- failure_time: float | None = None#
- error_type: str | None = None#
- process_id: int | None = None#
- thread_id: int | None = None#
- __init__(event_type: EventType, timestamp: float, message: str, event_id: str | None = None, job_id: str | None = None, partition_id: int | None = None, operation_name: str | None = None, operation_idx: int | None = None, status: str | None = None, duration: float | None = None, error_message: str | None = None, stack_trace: str | None = None, retry_count: int | None = None, checkpoint_path: str | None = None, op_args: Dict[str, Any] | None = None, input_rows: int | None = None, output_rows: int | None = None, output_path: str | None = None, partition_meta: Dict[str, Any] | None = None, config: Dict[str, Any] | None = None, metadata: Dict[str, Any] | None = None, total_partitions: int | None = None, successful_partitions: int | None = None, failed_partitions: int | None = None, job_duration: float | None = None, completion_time: float | None = None, failure_time: float | None = None, error_type: str | None = None, process_id: int | None = None, thread_id: int | None = None) None#
- class data_juicer.core.executor.event_logging_mixin.EventLogger(log_dir: str, job_id: str | None = None, work_dir: str | None = None)[源代码]#
基类:
objectEvent logging system with real-time capabilities and JSONL event log for resumability.
- events: deque#
- find_latest_events_file(work_dir: str) Path | None[源代码]#
Find the latest events file in the work directory.
- check_job_completion(events_file: Path) bool[源代码]#
Check if job is already completed by looking for job_complete event.
- get_events(event_type: EventType | None = None, partition_id: int | None = None, operation_name: str | None = None, start_time: float | None = None, end_time: float | None = None, limit: int | None = None) List[Event][源代码]#
Get events with optional filtering.
- class data_juicer.core.executor.event_logging_mixin.EventLoggingMixin(*args, **kwargs)[源代码]#
基类:
objectMixin to add event logging capabilities to any executor.
- log_partition_complete(partition_id, duration, output_path, success=True, error=None)[源代码]#
Log partition completion with performance metrics.
- log_partition_failed(partition_id, error_message, retry_count)[源代码]#
Log partition failure with retry information.
- log_op_start(partition_id, operation_name, operation_idx, op_args, **kwargs)[源代码]#
Log operation start with detailed arguments.
- log_op_complete(partition_id, operation_name, operation_idx, duration, checkpoint_path, input_rows, output_rows, **kwargs)[源代码]#
Log operation completion with detailed performance metrics.
- log_op_failed(partition_id, operation_name, operation_idx, error_message, retry_count, **kwargs)[源代码]#
Log operation failure with error details.
- log_checkpoint_save(partition_id, operation_name, operation_idx, checkpoint_path)[源代码]#
Log checkpoint save with file information.
- log_checkpoint_load(partition_id, operation_name, operation_idx, checkpoint_path)[源代码]#
Log checkpoint load with file information.
- log_dag_build_complete(dag_info: Dict[str, Any])[源代码]#
Log DAG build completion with execution plan information.
- log_dag_node_ready(node_id: str, node_info: Dict[str, Any])[源代码]#
Log when a DAG node becomes ready for execution.
- log_dag_node_start(node_id: str, node_info: Dict[str, Any])[源代码]#
Log when a DAG node starts execution.
- log_dag_node_complete(node_id: str, node_info: Dict[str, Any], duration: float)[源代码]#
Log when a DAG node completes execution.
- log_dag_node_failed(node_id: str, node_info: Dict[str, Any], error_message: str, duration: float = 0)[源代码]#
Log when a DAG node fails execution.
- log_dag_parallel_group_start(group_id: str, group_info: Dict[str, Any])[源代码]#
Log when a parallel group starts execution.
- log_dag_parallel_group_complete(group_id: str, group_info: Dict[str, Any], duration: float)[源代码]#
Log when a parallel group completes execution.
- log_dag_execution_plan_saved(plan_path: str, plan_info: Dict[str, Any])[源代码]#
Log when DAG execution plan is saved.
- log_dag_execution_plan_loaded(plan_path: str, plan_info: Dict[str, Any])[源代码]#
Log when DAG execution plan is loaded.
- log_job_restart(restart_reason: str, original_start_time: float, resume_partitions: List[int], resume_from_operation: int, checkpoint_paths: List[str])[源代码]#
Log when a job is restarted after interruption.
- log_partition_resume(partition_id: int, resume_operation: int, checkpoint_path: str, resume_reason: str)[源代码]#
Log when a partition is resumed from a checkpoint.