data_juicer_agents.capabilities.session.runtime module#

Runtime primitives shared by session tools.

class data_juicer_agents.capabilities.session.runtime.SessionState(dataset_path: 'Optional[str]' = None, export_path: 'Optional[str]' = None, working_dir: 'str' = './.djx', plan_path: 'Optional[str]' = None, plan_intent: 'Optional[str]' = None, custom_operator_paths: 'List[str]' = <factory>, dataset_spec: 'Optional[Dict[str, Any]]'=None, process_spec: 'Optional[Dict[str, Any]]'=None, system_spec: 'Optional[Dict[str, Any]]'=None, draft_plan: 'Optional[Dict[str, Any]]'=None, draft_plan_path_hint: 'Optional[str]' = None, last_retrieval: 'Dict[str, Any]'=<factory>, last_inspected_dataset: 'Optional[str]' = None, last_dataset_profile: 'Dict[str, Any]'=<factory>, history: 'List[Dict[str, str]]'=<factory>)[source]#

Bases: object

dataset_path: str | None = None#
export_path: str | None = None#
working_dir: str = './.djx'#
plan_path: str | None = None#
plan_intent: str | None = None#
custom_operator_paths: List[str]#
dataset_spec: Dict[str, Any] | None = None#
process_spec: Dict[str, Any] | None = None#
system_spec: Dict[str, Any] | None = None#
draft_plan: Dict[str, Any] | None = None#
draft_plan_path_hint: str | None = None#
last_retrieval: Dict[str, Any]#
last_inspected_dataset: str | None = None#
last_dataset_profile: Dict[str, Any]#
history: List[Dict[str, str]]#
__init__(dataset_path: str | None = None, export_path: str | None = None, working_dir: str = './.djx', plan_path: str | None = None, plan_intent: str | None = None, custom_operator_paths: List[str] = <factory>, dataset_spec: Dict[str, ~typing.Any] | None=None, process_spec: Dict[str, ~typing.Any] | None=None, system_spec: Dict[str, ~typing.Any] | None=None, draft_plan: Dict[str, ~typing.Any] | None=None, draft_plan_path_hint: str | None = None, last_retrieval: Dict[str, ~typing.Any]=<factory>, last_inspected_dataset: str | None = None, last_dataset_profile: Dict[str, ~typing.Any]=<factory>, history: Dict[str, str]]=<factory>) None#
class data_juicer_agents.capabilities.session.runtime.SessionToolRuntime(*, state: SessionState, verbose: bool = False, event_callback: Callable[[Dict[str, Any]], None] | None = None)[source]#

Bases: object

Mutable runtime shared by all session tools.

__init__(*, state: SessionState, verbose: bool = False, event_callback: Callable[[Dict[str, Any]], None] | None = None) None[source]#
debug(message: str) None[source]#
emit_event(event_type: str, **payload: Any) None[source]#
invoke_tool(tool_name: str, args: Dict[str, Any], fn: Callable[[], Dict[str, Any]]) Dict[str, Any][source]#
invoke_text_tool(tool_name: str, args: Dict[str, Any], fn: Callable[[], Dict[str, Any]])[source]#
context_payload() Dict[str, Any][source]#
storage_root() Path[source]#
next_session_plan_path() str[source]#
load_plan_dict(plan_path: str) Dict[str, Any] | None[source]#
load_plan_model(plan_path: str) PlanModel | None[source]#
static looks_like_plan_id(value: str) bool[source]#
find_saved_plan_path_by_plan_id(plan_id: str) str | None[source]#
current_draft_plan_model() PlanModel | None[source]#
data_juicer_agents.capabilities.session.runtime.normalize_line_idx(idx: int, total: int) int[source]#
data_juicer_agents.capabilities.session.runtime.parse_line_ranges(ranges: Any) tuple[list[int] | None, str | None][source]#
data_juicer_agents.capabilities.session.runtime.run_interruptible_subprocess(command: Any, *, timeout_sec: int, shell: bool) Dict[str, Any][source]#
data_juicer_agents.capabilities.session.runtime.short_log(text: str, max_lines: int = 30, max_chars: int = 6000) str[source]#
data_juicer_agents.capabilities.session.runtime.to_bool(value: Any, default: bool = False) bool[source]#
data_juicer_agents.capabilities.session.runtime.to_int(value: Any, default: int) int[source]#
data_juicer_agents.capabilities.session.runtime.to_string_list(value: Any) List[str][source]#
data_juicer_agents.capabilities.session.runtime.to_text_response(payload: Dict[str, Any])[source]#
data_juicer_agents.capabilities.session.runtime.truncate_text(text: str, limit: int = 12000) str[source]#