Source code for data_juicer_agents.tui.app

# -*- coding: utf-8 -*-
"""Transcript-oriented terminal UI for dj-agents."""

from __future__ import annotations

import argparse
import os
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import List
from typing import Any
from typing import Dict

from rich.console import Console
from rich.text import Text

from data_juicer_agents.utils.agentscope_logging import install_thinking_warning_filter
from data_juicer_agents.utils.terminal_input import TerminalLineReader
from data_juicer_agents.tui.controller import SessionController
from data_juicer_agents.tui.event_adapter import apply_event
from data_juicer_agents.tui.models import TimelineItem
from data_juicer_agents.tui.models import TuiState
from data_juicer_agents.tui.noise_filter import install_tui_warning_filters
from data_juicer_agents.tui.noise_filter import sanitize_reasoning_text


_INPUT_STYLE = "bright_white"
_USER_STYLE = "bright_cyan"
_AGENT_STYLE = "bright_cyan"
_INPUT_PROMPT = "\n> "


@dataclass
class _ThinkingSpinner:
    stream: Any
    text: str = "thinking..."
    interval_sec: float = 0.35

    def __post_init__(self) -> None:
        self._frames: List[str] = ["|", "/", "-", "\\"]
        self._idx = 0
        self._last_tick = 0.0
        self._visible = False
        self._last_line_len = 0

    def tick(self) -> None:
        now = time.monotonic()
        if now - self._last_tick < self.interval_sec:
            return
        self._last_tick = now
        frame = self._frames[self._idx % len(self._frames)]
        self._idx += 1
        body = f"{frame} {self.text}"
        pad = max(self._last_line_len - len(body), 0)
        line = f"\r{body}{' ' * pad}"
        self.stream.write(line)
        self.stream.flush()
        self._visible = True
        self._last_line_len = len(body)

    def clear(self) -> None:
        if not self._visible:
            return
        self.stream.write("\r" + (" " * self._last_line_len) + "\r")
        self.stream.flush()
        self._visible = False
        self._last_line_len = 0


@dataclass
class _RunningToolState:
    tool: str
    started_monotonic: float


def _print_header(console: Console, state: TuiState) -> None:
    console.print(Text("dj-agents", style="bold"), highlight=False)
    line1 = Text()
    line1.append("session model: ", style="grey58")
    line1.append(state.model_label, style="bold")
    line1.append("  planner model: ", style="grey58")
    line1.append(state.planner_model_label, style="bold")
    console.print(line1, highlight=False)
    line2 = Text()
    line2.append("cwd: ", style="grey58")
    line2.append(state.cwd, style="cyan")
    line2.append("  session workdir: ", style="grey58")
    line2.append(state.session_workdir, style="cyan")
    console.print(line2, highlight=False)
    line3 = Text()
    line3.append("base url: ", style="grey58")
    line3.append(state.llm_base_url or "-", style="magenta")
    line3.append("  permissions: ", style="grey58")
    line3.append(state.permissions_label, style="yellow")
    console.print(line3, highlight=False)
    console.print(
        Text(
            "Tip: Ctrl+C interrupt current turn, Ctrl+D exit, /clear clear transcript",
            style="grey58",
        )
    )
    console.print()


def _new_line_reader() -> TerminalLineReader:
    return TerminalLineReader()


def _usage_hint_text() -> str:
    return (
        "Describe your task in natural language.\n"
        "Examples / 示例:\n"
        "1. Remove texts longer than 1500 characters from ./data/demo-dataset.jsonl, "
        "generate a plan, and execute it. / 我要去除 ./data/demo-dataset.jsonl 中长度大于 1500 的文本,帮我生成方案并执行。\n"
        "2. Retrieve operators for multimodal deduplication and explain when to use them. / "
        "帮我检索多模态去重相关算子,并说明适用场景。\n"
    )


def _print_block(console: Console, label: str, text: str, style: str, *, markdown: bool = False) -> None:
    header = Text(f" {label} ", style=f"bold {style}")
    console.print(header, highlight=False)
    content = str(text or "")
    if markdown:
        lines = _markdown_to_plain_lines(content)
        for line in lines:
            console.print(Text(f" {line}", style=f"bold {style}"), highlight=False)
    else:
        lines = content.splitlines() or [""]
        for line in lines:
            console.print(Text(f" {line}", style=f"bold {style}"), highlight=False)
    console.print()


def _markdown_to_plain_lines(content: str) -> List[str]:
    lines: List[str] = []
    in_code = False
    for raw in str(content or "").splitlines():
        stripped = raw.strip()
        if stripped.startswith("```"):
            in_code = not in_code
            continue
        if in_code:
            lines.append(raw)
            continue
        if stripped.startswith("#"):
            text = stripped.lstrip("#").strip()
            lines.append(text)
            continue
        lines.append(raw)
    if not lines:
        return [""]
    return lines


def _format_tool_prefix(item: TimelineItem) -> Text:
    status = str(item.status or "").strip().lower()
    if status == "running":
        marker = "●"
        color = "yellow"
        label = "running"
    elif status == "done":
        marker = "●"
        color = "green"
        label = "done"
    elif status == "failed":
        marker = "●"
        color = "red"
        label = "failed"
    else:
        marker = "●"
        color = "grey50"
        label = status or "event"

    line = Text()
    line.append(f"{marker} ", style=color)
    line.append(f"{label:<7}", style=f"bold {color}")
    line.append(" ")
    line.append(item.title)
    return line


def _print_tool_item(console: Console, item: TimelineItem) -> None:
    console.print(_format_tool_prefix(item), highlight=False)
    if item.text:
        console.print(Text(f"  {item.text}", style="grey62"), highlight=False)


def _print_timeline_item(console: Console, item: TimelineItem) -> None:
    if item.kind == "input":
        _print_block(console, "input", item.text, _INPUT_STYLE, markdown=False)
        return
    if item.kind == "user":
        _print_block(console, "you", item.text, _USER_STYLE, markdown=False)
        return
    if item.kind == "assistant":
        _print_block(console, "agent", item.text, _AGENT_STYLE, markdown=item.markdown)
        return
    if item.kind == "tool":
        _print_tool_item(console, item)
        return
    if item.kind == "reasoning":
        console.print(Text(f{item.text}", style="grey58"), highlight=False)
        return
    if item.kind == "system":
        console.print(Text(f"△ {item.text or item.title}", style="yellow"), highlight=False)
        return
    console.print(Text(f"- {item.text or item.title}", style="grey58"), highlight=False)


def _flush_timeline(console: Console, state: TuiState, cursor: int) -> int:
    items = state.timeline
    if cursor < 0:
        cursor = 0
    if cursor >= len(items):
        return cursor
    for item in items[cursor:]:
        _print_timeline_item(console, item)
    return len(items)


def _track_tool_event(
    event: Dict[str, Any],
    running_tools: Dict[str, _RunningToolState],
    now_monotonic: float,
) -> None:
    event_type = str(event.get("type", "")).strip()
    if event_type == "tool_start":
        call_id = str(event.get("call_id", "")).strip()
        tool = str(event.get("tool", "")).strip() or "unknown_tool"
        if call_id:
            running_tools[call_id] = _RunningToolState(
                tool=tool,
                started_monotonic=now_monotonic,
            )
        return
    if event_type == "tool_end":
        call_id = str(event.get("call_id", "")).strip()
        if call_id:
            running_tools.pop(call_id, None)


def _running_tool_status_text(
    running_tools: Dict[str, _RunningToolState],
    now_monotonic: float,
) -> str:
    if not running_tools:
        return ""
    active = sorted(running_tools.values(), key=lambda row: row.started_monotonic)
    primary = active[0]
    elapsed = max(now_monotonic - primary.started_monotonic, 0.0)
    extra = len(active) - 1
    if extra > 0:
        return f"running {primary.tool} (+{elapsed:.0f}s), +{extra} more"
    return f"running {primary.tool} (+{elapsed:.0f}s)"


[docs] def run_tui_session(args: argparse.Namespace) -> int: install_thinking_warning_filter() install_tui_warning_filters() session_model = os.environ.get("DJA_SESSION_MODEL", "qwen3-max-2026-01-23") planner_model = os.environ.get("DJA_PLANNER_MODEL", "qwen3-max-2026-01-23") base_url = os.environ.get( "DJA_OPENAI_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1", ) console = Console() line_reader = _new_line_reader() state = TuiState( status_line="ready", model_label=session_model, planner_model_label=planner_model, llm_base_url=base_url, cwd=os.getcwd(), session_workdir=str((Path.cwd() / ".djx").resolve()), ) controller = SessionController( dataset_path=args.dataset, export_path=args.export, verbose=bool(args.verbose), ) try: controller.start() except Exception as exc: console.print(f"Failed to start dj-agents session: {exc}", style="bold red") return 2 _print_header(console, state) state.add_timeline( kind="system", title="tip", text=_usage_hint_text(), ) cursor = _flush_timeline(console, state, cursor=0) while True: try: message = line_reader.read_line(_INPUT_PROMPT).strip() except EOFError: console.print("Session ended.") return 0 except KeyboardInterrupt: state.add_timeline( kind="system", title="interrupt", text="No running task to interrupt. Press Ctrl+D to exit.", ) cursor = _flush_timeline(console, state, cursor) continue if not message: continue if message == "/clear": state.timeline = [] cursor = 0 console.clear() _print_header(console, state) continue try: controller.submit_turn(message) except Exception as exc: console.print(f"Failed to submit turn: {exc}", style="red") continue spinner = _ThinkingSpinner(stream=sys.stdout, text="agent thinking") interrupt_sent = False turn_tool_event_count = 0 turn_planned_tool_count = 0 turn_reasoning_event_count = 0 running_tools: Dict[str, _RunningToolState] = {} saw_any_turn_event = False while controller.is_turn_running(): try: events = controller.drain_events() if events: spinner.clear() saw_any_turn_event = True for event in events: now = time.monotonic() _track_tool_event(event, running_tools, now) event_type = str(event.get("type", "")).strip() if event_type in {"tool_start", "tool_end"}: turn_tool_event_count += 1 if event_type == "reasoning_step": turn_reasoning_event_count += 1 planned_tools = event.get("planned_tools") if isinstance(planned_tools, list): turn_planned_tool_count += len( [row for row in planned_tools if isinstance(row, dict)] ) apply_event(state, event) cursor = _flush_timeline(console, state, cursor) now = time.monotonic() status_text = _running_tool_status_text(running_tools, now) if status_text: spinner.text = status_text spinner.tick() elif not saw_any_turn_event: spinner.text = "agent thinking" spinner.tick() else: spinner.clear() time.sleep(0.03) except KeyboardInterrupt: if not interrupt_sent and controller.request_interrupt(): interrupt_sent = True spinner.clear() state.add_timeline( kind="system", title="interrupt", text="Interrupt requested (Ctrl+C).", ) cursor = _flush_timeline(console, state, cursor) else: spinner.clear() state.add_timeline( kind="system", title="interrupt", text="Interrupt ignored.", ) cursor = _flush_timeline(console, state, cursor) spinner.clear() for event in controller.drain_events(): now = time.monotonic() _track_tool_event(event, running_tools, now) event_type = str(event.get("type", "")).strip() if event_type in {"tool_start", "tool_end"}: turn_tool_event_count += 1 if event_type == "reasoning_step": turn_reasoning_event_count += 1 planned_tools = event.get("planned_tools") if isinstance(planned_tools, list): turn_planned_tool_count += len([row for row in planned_tools if isinstance(row, dict)]) apply_event(state, event) cursor = _flush_timeline(console, state, cursor) reply = controller.consume_turn_result() state.add_message("agent", reply.text, markdown=True) thinking = sanitize_reasoning_text(str(getattr(reply, "thinking", "") or "")) if thinking and turn_reasoning_event_count == 0: state.append_reasoning(thinking) if turn_tool_event_count == 0 and turn_planned_tool_count > 0: state.add_timeline( kind="system", title="tool_hint", text=( "A tool was planned in this turn, but no actual execution result was observed. " "Retry or use --verbose to inspect more detailed internal logs." ), ) cursor = _flush_timeline(console, state, cursor) if bool(getattr(reply, "stop", False)): return 0