Source code for data_juicer_agents.tools.process.execute_python_code.logic
# -*- coding: utf-8 -*-
"""Pure logic for execute_python_code."""
from __future__ import annotations
import sys
from typing import Any, Dict
from data_juicer_agents.utils.runtime_helpers import run_interruptible_subprocess, to_int
[docs]
def execute_python_code(*, code: str, timeout: int = 120) -> Dict[str, Any]:
snippet = str(code or "")
if not snippet.strip():
return {
"ok": False,
"error_type": "missing_required",
"requires": ["code"],
"message": "code is required for execute_python_code",
}
timeout_sec = max(to_int(timeout, 120), 1)
payload = run_interruptible_subprocess([sys.executable, "-c", snippet], timeout_sec=timeout_sec, shell=False)
payload["action"] = "execute_python_code"
return payload