data_juicer_agents.utils.llm_gateway 源代码

# -*- coding: utf-8 -*-
"""Utilities for calling LLMs via OpenAI-compatible endpoints."""

from __future__ import annotations

import json
import os
import re
from typing import Any, Dict, List

_JSON_BLOCK_RE = re.compile(r"```(?:json)?\s*(.*?)```", re.DOTALL | re.IGNORECASE)
_DEFAULT_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
_DEFAULT_THINKING = "true"


def _extract_json_text(text: str) -> str:
    match = _JSON_BLOCK_RE.search(text)
    if match:
        return match.group(1).strip()
    return text.strip()


def _call_model_json_once(
    model_name: str,
    prompt: str,
    api_key: str | None = None,
    base_url: str | None = None,
    thinking: bool | None = None,
) -> Dict[str, Any]:
    from openai import OpenAI

    api_key = (
        str(api_key).strip()
        if api_key is not None
        else (os.environ.get("DASHSCOPE_API_KEY") or os.environ.get("MODELSCOPE_API_TOKEN"))
    )
    if not api_key:
        raise RuntimeError("Missing API key: set DASHSCOPE_API_KEY or MODELSCOPE_API_TOKEN")

    if base_url is None:
        base_url = os.environ.get("DJA_OPENAI_BASE_URL", _DEFAULT_BASE_URL)
    if thinking is None:
        thinking_flag = os.environ.get("DJA_LLM_THINKING", _DEFAULT_THINKING).lower() in {
            "1",
            "true",
            "yes",
            "on",
        }
    else:
        thinking_flag = bool(thinking)

    client = OpenAI(api_key=api_key, base_url=base_url)
    response = client.chat.completions.create(
        model=model_name,
        messages=[{"role": "user", "content": prompt}],
        temperature=0,
        extra_body={"enable_thinking": thinking_flag},
    )

    text = response.choices[0].message.content or ""
    payload = _extract_json_text(text)
    return json.loads(payload)


def _candidate_models(model_name: str) -> List[str]:
    configured = os.environ.get("DJA_MODEL_FALLBACKS", "")
    extras = [item.strip() for item in configured.split(",") if item.strip()]

    seen = {model_name}
    ordered = [model_name]
    for item in extras:
        if item not in seen:
            ordered.append(item)
            seen.add(item)
    return ordered


[文档] def call_model_json( model_name: str, prompt: str, api_key: str | None = None, base_url: str | None = None, thinking: bool | None = None, ) -> Dict[str, Any]: errors: List[str] = [] for candidate in _candidate_models(model_name): try: return _call_model_json_once( candidate, prompt, api_key=api_key, base_url=base_url, thinking=thinking, ) except Exception as exc: # pragma: no cover - exercised via tests with monkeypatch errors.append(f"{candidate}: {exc}") joined = "; ".join(errors) raise RuntimeError(f"LLM call failed for all candidate models: {joined}")