Source code for data_juicer_agents.tools.process.execute_shell_command.logic
# -*- coding: utf-8 -*-
"""Pure logic for execute_shell_command."""
from __future__ import annotations
from typing import Any, Dict
from data_juicer_agents.utils.runtime_helpers import run_interruptible_subprocess, to_int
[docs]
def execute_shell_command(*, command: str, timeout: int = 120) -> Dict[str, Any]:
cmd = str(command or "").strip()
if not cmd:
return {
"ok": False,
"error_type": "missing_required",
"requires": ["command"],
"message": "command is required for execute_shell_command",
}
timeout_sec = max(to_int(timeout, 120), 1)
payload = run_interruptible_subprocess(cmd, timeout_sec=timeout_sec, shell=True)
payload["action"] = "execute_shell_command"
return payload