data_juicer.utils.job.stopper 源代码

#!/usr/bin/env python3
"""
DataJuicer Job Stopper

A utility to stop DataJuicer jobs by reading event logs to find process and thread IDs,
then terminating those specific processes and threads.
"""

import json
import sys
import time
from typing import Any, Dict

import psutil
from loguru import logger

from data_juicer.utils.job.common import JobUtils, list_running_jobs


[文档] class JobStopper: """Stop DataJuicer jobs using event log-based process discovery."""
[文档] def __init__(self, job_id: str, base_dir: str = "outputs/partition-checkpoint-eventlog"): self.job_utils = JobUtils(job_id, base_dir=base_dir) self.job_id = job_id self.work_dir = self.job_utils.work_dir
[文档] def terminate_process_gracefully(self, proc, timeout: int = 10) -> bool: """Terminate a process gracefully with timeout.""" try: logger.info(f"Terminating process {proc.pid} gracefully...") proc.terminate() # Wait for the process to terminate try: proc.wait(timeout=timeout) logger.info(f"Process {proc.pid} terminated gracefully") return True except psutil.TimeoutExpired: logger.warning(f"Process {proc.pid} did not terminate within {timeout}s, force killing...") proc.kill() proc.wait() logger.info(f"Process {proc.pid} force killed") return True except psutil.NoSuchProcess: logger.info(f"Process {proc.pid} already terminated") return True except psutil.AccessDenied: logger.error(f"Access denied when terminating process {proc.pid}") return False except Exception as e: logger.error(f"Error terminating process {proc.pid}: {e}") return False
[文档] def cleanup_job_resources(self) -> None: """Clean up job resources and update job summary.""" job_summary = self.job_utils.load_job_summary() if job_summary: job_summary["status"] = "stopped" job_summary["stop_time"] = time.time() job_summary["stop_reason"] = "manual_stop" try: with open(self.work_dir / "job_summary.json", "w") as f: json.dump(job_summary, f, indent=2, default=str) logger.info(f"Updated job summary: {self.work_dir / 'job_summary.json'}") except Exception as e: logger.error(f"Failed to update job summary: {e}")
[文档] def stop_job(self, force: bool = False, timeout: int = 30) -> Dict[str, Any]: """Stop the DataJuicer job using event log-based process discovery.""" results = { "job_id": self.job_id, "success": False, "processes_found": 0, "processes_terminated": 0, "threads_found": 0, "threads_terminated": 0, "errors": [], } logger.info(f"🛑 Stopping DataJuicer job: {self.job_id}") logger.info(f"Work directory: {self.work_dir}") # Load job summary job_summary = self.job_utils.load_job_summary() if job_summary: logger.info(f"Job status: {job_summary.get('status', 'unknown')}") logger.info(f"Job started: {job_summary.get('start_time', 'unknown')}") # Extract process and thread IDs from event logs logger.info("🔍 Extracting process and thread IDs from event logs...") ids = self.job_utils.extract_process_thread_ids() results["processes_found"] = len(ids["process_ids"]) results["threads_found"] = len(ids["thread_ids"]) if not ids["process_ids"] and not ids["thread_ids"]: logger.warning("No process or thread IDs found in event logs") results["errors"].append("No process or thread IDs found in event logs") self.cleanup_job_resources() return results # Find and terminate processes logger.info(f"🔍 Finding {len(ids['process_ids'])} processes...") processes = self.job_utils.find_processes_by_ids(ids["process_ids"]) if processes: logger.info(f"Found {len(processes)} running processes to terminate") for proc in processes: if self.terminate_process_gracefully(proc, timeout): results["processes_terminated"] += 1 else: results["errors"].append(f"Failed to terminate process {proc.pid}") else: logger.info("No running processes found") # Find and terminate threads (placeholder for future implementation) logger.info(f"🔍 Finding {len(ids['thread_ids'])} threads...") threads = self.job_utils.find_threads_by_ids(ids["thread_ids"]) results["threads_terminated"] = len(threads) # Clean up job resources logger.info("🧹 Cleaning up job resources...") self.cleanup_job_resources() # Determine success results["success"] = results["processes_terminated"] > 0 or results["threads_terminated"] > 0 if results["success"]: logger.info(f"✅ Job {self.job_id} stopped successfully") logger.info(f" Terminated {results['processes_terminated']} processes") logger.info(f" Terminated {results['threads_terminated']} threads") else: logger.warning(f"⚠️ Job {self.job_id} may not have been fully stopped") if results["errors"]: logger.error(f" Errors: {results['errors']}") return results
[文档] def stop_job( job_id: str, base_dir: str = "outputs/partition-checkpoint-eventlog", force: bool = False, timeout: int = 30 ) -> Dict[str, Any]: """Stop a DataJuicer job using event log-based process discovery.""" stopper = JobStopper(job_id, base_dir) return stopper.stop_job(force=force, timeout=timeout)
[文档] def main(): """Main function for command-line usage.""" import argparse parser = argparse.ArgumentParser(description="Stop DataJuicer jobs using event log-based process discovery") parser.add_argument("job_id", nargs="?", help="Job ID to stop") parser.add_argument( "--base-dir", default="outputs/partition-checkpoint-eventlog", help="Base directory for job outputs" ) parser.add_argument("--force", action="store_true", help="Force termination") parser.add_argument("--timeout", type=int, default=30, help="Termination timeout in seconds") parser.add_argument("--list", action="store_true", help="List all jobs") parser.add_argument("--verbose", action="store_true", help="Verbose output") args = parser.parse_args() if args.verbose: logger.remove() logger.add(sys.stderr, level="DEBUG") if args.list: jobs = list_running_jobs(args.base_dir) if jobs: print("📋 DataJuicer Jobs:") print("=" * 80) for job in jobs: status_icon = "🟢" if job["status"] == "completed" else "🟡" if job["status"] == "running" else "🔴" print(f"{status_icon} {job['job_id']} | Status: {job['status']} | Processes: {job['processes']}") else: print("No DataJuicer jobs found") return if not args.job_id: parser.error("Job ID is required unless using --list") result = stop_job(args.job_id, args.base_dir, force=args.force, timeout=args.timeout) if result["success"]: print(f"✅ Job {args.job_id} stopped successfully") print(f" Terminated {result['processes_terminated']} processes") print(f" Terminated {result['threads_terminated']} threads") else: print(f"⚠️ Job {args.job_id} may not have been fully stopped") if result["errors"]: print(f" Errors: {result['errors']}") sys.exit(1)
if __name__ == "__main__": main()