data_juicer_agents.tools.process.execute_bash.logic 源代码

# -*- coding: utf-8 -*-
"""Pure logic for execute_bash — SmartBash execution with structured parsing."""

from __future__ import annotations

from typing import Any, Dict

from data_juicer_agents.utils.runtime_helpers import run_interruptible_subprocess, to_int

from .._shared.diagnostics import diagnose
from .._shared.parser import parse_output


[文档] def execute_bash(*, command: str, timeout: int = 120) -> Dict[str, Any]: cmd = str(command or "").strip() if not cmd: return { "ok": False, "error_type": "missing_required", "requires": ["command"], "message": "command is required", } timeout_sec = max(to_int(timeout, 120), 1) raw = run_interruptible_subprocess(cmd, timeout_sec=timeout_sec, shell=True) returncode = raw.get("returncode", -1) stdout = str(raw.get("stdout", "")) stderr = str(raw.get("stderr", "")) parsed = parse_output(command=cmd, returncode=returncode, stdout=stdout, stderr=stderr) diag, suggestion = diagnose( flavour=parsed.flavour, returncode=returncode, stdout=stdout, stderr=stderr, ) return { "ok": parsed.ok, "action": "execute_bash", "flavour": parsed.flavour, "returncode": returncode, "stdout": parsed.stdout, "stderr": stderr, "summary": parsed.summary, "items": parsed.items, "count": parsed.count, "truncated": parsed.truncated, "diagnosis": diag, "suggestion": suggestion, "command": cmd, "timeout": timeout_sec, }