data_juicer_agents.capabilities.session.runtime module#
Runtime primitives shared by session tools.
- class data_juicer_agents.capabilities.session.runtime.SessionState(dataset_path: 'Optional[str]' = None, export_path: 'Optional[str]' = None, working_dir: 'str' = './.djx', plan_path: 'Optional[str]' = None, plan_intent: 'Optional[str]' = None, custom_operator_paths: 'List[str]' = <factory>, dataset_spec: 'Optional[Dict[str, Any]]'=None, process_spec: 'Optional[Dict[str, Any]]'=None, system_spec: 'Optional[Dict[str, Any]]'=None, draft_plan: 'Optional[Dict[str, Any]]'=None, draft_plan_path_hint: 'Optional[str]' = None, last_retrieval: 'Dict[str, Any]'=<factory>, last_inspected_dataset: 'Optional[str]' = None, last_dataset_profile: 'Dict[str, Any]'=<factory>, history: 'List[Dict[str, str]]'=<factory>)[source]#
Bases:
object- dataset_path: str | None = None#
- export_path: str | None = None#
- working_dir: str = './.djx'#
- plan_path: str | None = None#
- plan_intent: str | None = None#
- custom_operator_paths: List[str]#
- dataset_spec: Dict[str, Any] | None = None#
- process_spec: Dict[str, Any] | None = None#
- system_spec: Dict[str, Any] | None = None#
- draft_plan: Dict[str, Any] | None = None#
- draft_plan_path_hint: str | None = None#
- last_retrieval: Dict[str, Any]#
- last_inspected_dataset: str | None = None#
- last_dataset_profile: Dict[str, Any]#
- history: List[Dict[str, str]]#
- __init__(dataset_path: str | None = None, export_path: str | None = None, working_dir: str = './.djx', plan_path: str | None = None, plan_intent: str | None = None, custom_operator_paths: List[str] = <factory>, dataset_spec: Dict[str, ~typing.Any] | None=None, process_spec: Dict[str, ~typing.Any] | None=None, system_spec: Dict[str, ~typing.Any] | None=None, draft_plan: Dict[str, ~typing.Any] | None=None, draft_plan_path_hint: str | None = None, last_retrieval: Dict[str, ~typing.Any]=<factory>, last_inspected_dataset: str | None = None, last_dataset_profile: Dict[str, ~typing.Any]=<factory>, history: Dict[str, str]]=<factory>) None#
- class data_juicer_agents.capabilities.session.runtime.SessionToolRuntime(*, state: SessionState, verbose: bool = False, event_callback: Callable[[Dict[str, Any]], None] | None = None)[source]#
Bases:
objectMutable runtime shared by all session tools.
- __init__(*, state: SessionState, verbose: bool = False, event_callback: Callable[[Dict[str, Any]], None] | None = None) None[source]#
- data_juicer_agents.capabilities.session.runtime.normalize_line_idx(idx: int, total: int) int[source]#
- data_juicer_agents.capabilities.session.runtime.parse_line_ranges(ranges: Any) tuple[list[int] | None, str | None][source]#
- data_juicer_agents.capabilities.session.runtime.run_interruptible_subprocess(command: Any, *, timeout_sec: int, shell: bool) Dict[str, Any][source]#
- data_juicer_agents.capabilities.session.runtime.short_log(text: str, max_lines: int = 30, max_chars: int = 6000) str[source]#