Source code for data_juicer_agents.tools.files.write_text_file.logic

# -*- coding: utf-8 -*-
"""Pure logic for write_text_file."""

from __future__ import annotations

from pathlib import Path
from typing import Any, Dict

from data_juicer_agents.utils.runtime_helpers import normalize_line_idx, parse_line_ranges


[docs] def write_text_file(*, file_path: str, content: str = "", ranges: Any = None) -> Dict[str, Any]: path = str(file_path or "").strip() if not path: return { "ok": False, "error_type": "missing_required", "requires": ["file_path"], "message": "file_path is required for write_text_file", } target = Path(path).expanduser() payload = str(content or "") parsed_ranges, err = parse_line_ranges(ranges) if err: return {"ok": False, "error_type": "invalid_ranges", "message": err} try: target.parent.mkdir(parents=True, exist_ok=True) except Exception as exc: return {"ok": False, "error_type": "mkdir_failed", "message": f"failed to create parent dir: {exc}"} if parsed_ranges is None or not target.exists(): try: target.write_text(payload, encoding="utf-8") except Exception as exc: return {"ok": False, "error_type": "write_failed", "message": f"failed to write file: {exc}"} return { "ok": True, "action": "write_text_file", "file_path": str(target), "line_range": parsed_ranges, "message": f"wrote file {target}", } if not target.is_file(): return {"ok": False, "error_type": "invalid_file_type", "message": f"path is not a file: {target}"} start_raw, end_raw = parsed_ranges try: lines = target.read_text(encoding="utf-8").splitlines(keepends=True) except Exception as exc: return {"ok": False, "error_type": "read_failed", "message": f"failed to read existing file: {exc}"} start = max(normalize_line_idx(start_raw, len(lines)), 1) end = min(normalize_line_idx(end_raw, len(lines)), len(lines)) if len(lines) > 0 and (start > end or start > len(lines)): return { "ok": False, "error_type": "invalid_ranges", "message": f"invalid line range after normalization: [{start}, {end}]", } replacement = payload if replacement and not replacement.endswith("\n"): replacement = replacement + "\n" new_lines = lines[: max(start - 1, 0)] + [replacement] + lines[end:] try: target.write_text("".join(new_lines), encoding="utf-8") except Exception as exc: return {"ok": False, "error_type": "write_failed", "message": f"failed to write file: {exc}"} return { "ok": True, "action": "write_text_file", "file_path": str(target), "line_range": [start, end], "message": f"updated lines [{start}, {end}] in {target}", }