data_juicer.core.executor.event_logging_mixin module#

Event Logging Mixin for Data-Juicer Executors

This module provides comprehensive event logging capabilities that can be used by any executor (default, ray, partitioned, etc.) to track operations, performance, and errors in real-time.

Features: 1. Real-time event logging with configurable levels 2. Event filtering and querying 3. Performance metrics tracking 4. Error tracking with stack traces 5. Status reporting and monitoring 6. Log rotation and cleanup

class data_juicer.core.executor.event_logging_mixin.EventType(value)[源代码]#

基类:Enum

Types of events that can be logged.

JOB_START = 'job_start'#
JOB_COMPLETE = 'job_complete'#
JOB_FAILED = 'job_failed'#
JOB_RESTART = 'job_restart'#
PARTITION_START = 'partition_start'#
PARTITION_COMPLETE = 'partition_complete'#
PARTITION_FAILED = 'partition_failed'#
PARTITION_RESUME = 'partition_resume'#
OP_START = 'op_start'#
OP_COMPLETE = 'op_complete'#
OP_FAILED = 'op_failed'#
CHECKPOINT_SAVE = 'checkpoint_save'#
CHECKPOINT_LOAD = 'checkpoint_load'#
PROCESSING_START = 'processing_start'#
PROCESSING_COMPLETE = 'processing_complete'#
PROCESSING_ERROR = 'processing_error'#
DAG_BUILD_START = 'dag_build_start'#
DAG_BUILD_COMPLETE = 'dag_build_complete'#
DAG_NODE_READY = 'dag_node_ready'#
DAG_NODE_START = 'dag_node_start'#
DAG_NODE_COMPLETE = 'dag_node_complete'#
DAG_NODE_FAILED = 'dag_node_failed'#
DAG_PARALLEL_GROUP_START = 'dag_parallel_group_start'#
DAG_PARALLEL_GROUP_COMPLETE = 'dag_parallel_group_complete'#
DAG_EXECUTION_PLAN_SAVED = 'dag_execution_plan_saved'#
DAG_EXECUTION_PLAN_LOADED = 'dag_execution_plan_loaded'#
class data_juicer.core.executor.event_logging_mixin.Event(event_type: EventType, timestamp: float, message: str, event_id: str | None = None, job_id: str | None = None, partition_id: int | None = None, operation_name: str | None = None, operation_idx: int | None = None, status: str | None = None, duration: float | None = None, error_message: str | None = None, stack_trace: str | None = None, retry_count: int | None = None, checkpoint_path: str | None = None, op_args: Dict[str, Any] | None = None, input_rows: int | None = None, output_rows: int | None = None, output_path: str | None = None, partition_meta: Dict[str, Any] | None = None, config: Dict[str, Any] | None = None, metadata: Dict[str, Any] | None = None, total_partitions: int | None = None, successful_partitions: int | None = None, failed_partitions: int | None = None, job_duration: float | None = None, completion_time: float | None = None, failure_time: float | None = None, error_type: str | None = None, process_id: int | None = None, thread_id: int | None = None)[源代码]#

基类:object

Event data structure.

event_type: EventType#
timestamp: float#
message: str#
event_id: str | None = None#
job_id: str | None = None#
partition_id: int | None = None#
operation_name: str | None = None#
operation_idx: int | None = None#
status: str | None = None#
duration: float | None = None#
error_message: str | None = None#
stack_trace: str | None = None#
retry_count: int | None = None#
checkpoint_path: str | None = None#
op_args: Dict[str, Any] | None = None#
input_rows: int | None = None#
output_rows: int | None = None#
output_path: str | None = None#
partition_meta: Dict[str, Any] | None = None#
config: Dict[str, Any] | None = None#
metadata: Dict[str, Any] | None = None#
total_partitions: int | None = None#
successful_partitions: int | None = None#
failed_partitions: int | None = None#
job_duration: float | None = None#
completion_time: float | None = None#
failure_time: float | None = None#
error_type: str | None = None#
process_id: int | None = None#
thread_id: int | None = None#
__init__(event_type: EventType, timestamp: float, message: str, event_id: str | None = None, job_id: str | None = None, partition_id: int | None = None, operation_name: str | None = None, operation_idx: int | None = None, status: str | None = None, duration: float | None = None, error_message: str | None = None, stack_trace: str | None = None, retry_count: int | None = None, checkpoint_path: str | None = None, op_args: Dict[str, Any] | None = None, input_rows: int | None = None, output_rows: int | None = None, output_path: str | None = None, partition_meta: Dict[str, Any] | None = None, config: Dict[str, Any] | None = None, metadata: Dict[str, Any] | None = None, total_partitions: int | None = None, successful_partitions: int | None = None, failed_partitions: int | None = None, job_duration: float | None = None, completion_time: float | None = None, failure_time: float | None = None, error_type: str | None = None, process_id: int | None = None, thread_id: int | None = None) None#
class data_juicer.core.executor.event_logging_mixin.EventLogger(log_dir: str, job_id: str | None = None, work_dir: str | None = None)[源代码]#

基类:object

Event logging system with real-time capabilities and JSONL event log for resumability.

__init__(log_dir: str, job_id: str | None = None, work_dir: str | None = None)[源代码]#
events: deque#
log_event(event: Event)[源代码]#

Log an event (to memory, loguru, and JSONL for resumability).

find_latest_events_file(work_dir: str) Path | None[源代码]#

Find the latest events file in the work directory.

check_job_completion(events_file: Path) bool[源代码]#

Check if job is already completed by looking for job_complete event.

get_events(event_type: EventType | None = None, partition_id: int | None = None, operation_name: str | None = None, start_time: float | None = None, end_time: float | None = None, limit: int | None = None) List[Event][源代码]#

Get events with optional filtering.

generate_status_report() str[源代码]#

Generate a comprehensive status report.

monitor_events(event_type: EventType | None = None) Generator[Event, None, None][源代码]#

Monitor events in real-time.

classmethod list_available_jobs(work_dir: str) List[Dict[str, Any]][源代码]#

List available jobs for resumption from a work directory.

class data_juicer.core.executor.event_logging_mixin.EventLoggingMixin(*args, **kwargs)[源代码]#

基类:object

Mixin to add event logging capabilities to any executor.

__init__(*args, **kwargs)[源代码]#

Initialize the mixin.

log_job_start(config, total_partitions)[源代码]#

Log job start with detailed configuration.

log_job_complete(duration, output_path=None)[源代码]#

Log job completion with performance metrics.

log_job_failed(error_message, duration)[源代码]#

Log job failure with error details.

log_partition_start(partition_id, partition_meta)[源代码]#

Log partition start with detailed metadata.

log_partition_complete(partition_id, duration, output_path, success=True, error=None)[源代码]#

Log partition completion with performance metrics.

log_partition_failed(partition_id, error_message, retry_count)[源代码]#

Log partition failure with retry information.

log_op_start(partition_id, operation_name, operation_idx, op_args, **kwargs)[源代码]#

Log operation start with detailed arguments.

log_op_complete(partition_id, operation_name, operation_idx, duration, checkpoint_path, input_rows, output_rows, **kwargs)[源代码]#

Log operation completion with detailed performance metrics.

log_op_failed(partition_id, operation_name, operation_idx, error_message, retry_count, **kwargs)[源代码]#

Log operation failure with error details.

log_checkpoint_save(partition_id, operation_name, operation_idx, checkpoint_path)[源代码]#

Log checkpoint save with file information.

log_checkpoint_load(partition_id, operation_name, operation_idx, checkpoint_path)[源代码]#

Log checkpoint load with file information.

log_dag_build_start(ast_info: Dict[str, Any])[源代码]#

Log DAG build start with AST information.

log_dag_build_complete(dag_info: Dict[str, Any])[源代码]#

Log DAG build completion with execution plan information.

log_dag_node_ready(node_id: str, node_info: Dict[str, Any])[源代码]#

Log when a DAG node becomes ready for execution.

log_dag_node_start(node_id: str, node_info: Dict[str, Any])[源代码]#

Log when a DAG node starts execution.

log_dag_node_complete(node_id: str, node_info: Dict[str, Any], duration: float)[源代码]#

Log when a DAG node completes execution.

log_dag_node_failed(node_id: str, node_info: Dict[str, Any], error_message: str, duration: float = 0)[源代码]#

Log when a DAG node fails execution.

log_dag_parallel_group_start(group_id: str, group_info: Dict[str, Any])[源代码]#

Log when a parallel group starts execution.

log_dag_parallel_group_complete(group_id: str, group_info: Dict[str, Any], duration: float)[源代码]#

Log when a parallel group completes execution.

log_dag_execution_plan_saved(plan_path: str, plan_info: Dict[str, Any])[源代码]#

Log when DAG execution plan is saved.

log_dag_execution_plan_loaded(plan_path: str, plan_info: Dict[str, Any])[源代码]#

Log when DAG execution plan is loaded.

log_job_restart(restart_reason: str, original_start_time: float, resume_partitions: List[int], resume_from_operation: int, checkpoint_paths: List[str])[源代码]#

Log when a job is restarted after interruption.

log_partition_resume(partition_id: int, resume_operation: int, checkpoint_path: str, resume_reason: str)[源代码]#

Log when a partition is resumed from a checkpoint.

get_events(**kwargs) List[Event][源代码]#

Get events with optional filtering.

generate_status_report() str[源代码]#

Generate status report.

monitor_events(event_type: EventType | None = None) Generator[Event, None, None][源代码]#

Monitor events in real-time.

analyze_resumption_state(job_id: str) Dict[str, Any][源代码]#

Analyze event history to determine resumption state and generate resumption plan.

参数:

job_id -- The job ID to analyze

返回:

Dictionary containing resumption analysis and plan