Source code for data_juicer.ops.mapper.pii_llm_suspect_mapper

# Copyright 2025 The Data-Juicer Authors. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Optional LLM PII audit; optional aggressive redaction after regex mappers.

from __future__ import annotations

import json
import os
import re
import threading
from typing import Any, Dict, List, Optional, Set, Tuple

from loguru import logger
from pydantic import PositiveInt

from data_juicer.ops.base_op import OPERATORS, TAGGING_OPS, Mapper
from data_juicer.ops.mapper.dialog_llm_input_utils import clip_text_for_dialog_prompt
from data_juicer.utils.agent_output_locale import normalize_preferred_output_lang
from data_juicer.utils.constant import Fields, MetaKeys
from data_juicer.utils.model_utils import get_model, prepare_model

OP_NAME = "pii_llm_suspect_mapper"

_SPACY_PIPELINE_LOCKS: Dict[str, threading.Lock] = {}


def _spacy_pipeline_lock(name: str) -> threading.Lock:
    if name not in _SPACY_PIPELINE_LOCKS:
        _SPACY_PIPELINE_LOCKS[name] = threading.Lock()
    return _SPACY_PIPELINE_LOCKS[name]


def _resolve_spacy_auto_download_flag(requested: bool) -> bool:
    """Env ``PII_SPACY_AUTO_DOWNLOAD`` overrides when set to true/false."""
    raw = os.environ.get("PII_SPACY_AUTO_DOWNLOAD", "").strip().lower()
    if raw in ("0", "false", "no", "off"):
        return False
    if raw in ("1", "true", "yes", "on"):
        return True
    return bool(requested)


[docs] def ensure_spacy_pipeline_installed(model_name: str, *, auto_download: bool) -> None: """If ``auto_download`` and pipeline missing, run ``spacy.cli.download`` (needs network).""" if not auto_download or not model_name: return try: import spacy.util if spacy.util.is_package(model_name): return except Exception: pass try: from spacy.cli import download logger.info( "pii_llm_suspect_mapper: downloading spaCy pipeline {} (pip/network)", repr(model_name), ) download(model_name) except Exception as e: logger.warning( "pii_llm_suspect_mapper: spaCy download({}) failed: {} — " "install manually e.g. python -m spacy download {}", repr(model_name), e, model_name, )
DEFAULT_REDACTION_PLACEHOLDER = "[LLM_PII_SUSPECT_REDACTED]" DEFAULT_SYSTEM_PROMPT = ( "你是数据合规审计助手。下面给出若干字段的截断文本(可能已部分脱敏)。" "请只识别**仍可能残留的敏感信息**(如真实人名、未脱敏手机号/证件片段、" "内网地址、密钥片段、可识别工号、仍未规则脱敏的 URL/IPv4/MAC、" "JWT/PEM/credential 片段等)。不要编造;不确定则不要列出。" "必须只输出一个 JSON 对象,不要 markdown,不要解释。Schema:\n" '{"suspected":[{"field":"字段名","category":"类型简述",' '"evidence":"从输入中摘录的极短原文片段≤48字符"}],' '"likely_clean":true或false}\n' "若无可疑项,suspected 用 [],likely_clean 用 true。" ) DEFAULT_SYSTEM_PROMPT_EN = ( "You are a data-compliance auditor. Below are truncated field excerpts " "(possibly partially redacted). Identify **only** residual sensitive " "data (real names, phone/ID fragments, intranet hosts, secret snippets, " "identifiable employee IDs, raw URLs/IPv4/MAC not yet redacted, " "JWT/PEM/credential fragments, etc.). Do not invent; if unsure, omit.\n" "Output exactly one JSON object, no markdown. Schema:\n" '{"suspected":[{"field":"field_key","category":"short_type",' '"evidence":"verbatim snippet from input, ≤48 chars"}],' '"likely_clean":true or false}\n' "Use suspected=[] and likely_clean=true when nothing suspicious." ) def _extract_json_object(text: str) -> Optional[dict]: if not text or not isinstance(text, str): return None s = text.strip() if s.startswith("```"): s = re.sub(r"^```(?:json)?\s*", "", s, flags=re.IGNORECASE) s = re.sub(r"\s*```\s*$", "", s) start = s.find("{") end = s.rfind("}") if start < 0 or end <= start: return None try: frag = s[start : end + 1] # noqa: E203 return json.loads(frag) except json.JSONDecodeError: return None def _heuristic_trigger(text: str) -> bool: """Cheap pre-filter: skip LLM when nothing looks like residual PII.""" if not text or len(text.strip()) < 8: return False if re.search(r"\d{6,}", text): return True if "@" in text: return True secret_pat = r"(?i)(?:^|[^a-z0-9_])(?:" r"api[_-]?key|apikey|secret|token|password|passwd|credential|bearer)\b" if re.search(secret_pat, text): return True intranet = r"(?i)\b(?:内网|局域网|vpn|ssh\s|rdp://|mysql://|redis://)\b" if re.search(intranet, text): return True # Long alphanumeric runs (possible tokens / hashes) if re.search(r"[A-Za-z0-9_-]{24,}", text): return True return False # High-precision contextual cues (no extra deps). Intentionally conservative to limit false API calls. _NAME_LIKE_RULE_PAT = re.compile( r"(?:" r"(?:^|[\s,,。.!!??;;::])" r"(?:我叫|叫我|更名为|姓名\s*[::\s为是]|真名\s*[::\s为是]|" r"联系人\s*[::\s为]?|负责人\s*[::\s为]?|经办人\s*[::\s为]?|" r"用户(?:姓名|名叫)\s*[::\s为]?|(?:致|尊敬的)\s*)" # Name may be followed by more hanzi (e.g. 我叫张三想咨询) without punctuation. r"[\u4e00-\u9fff]{2,4}(?=$|[\s,,。.!!??;;::]|[\u4e00-\u9fff])" r"|" r"[\u4e00-\u9fff]{2,4}(?:先生|女士|老师|大夫|经理|主任|院长|博士|教授|同志)(?=[\s,,。.!!??;;]|$)" r"|" r"\b(?:Mr|Mrs|Ms|Miss|Dr|Prof)\.?\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)?\b" r")", re.UNICODE, ) def _name_like_rule_trigger(text: str) -> bool: """Rule-based cues that real person names may appear (esp. missed by ``_heuristic_trigger``).""" if not text or len(text.strip()) < 4: return False return _NAME_LIKE_RULE_PAT.search(text) is not None def _normalize_spacy_ner_model_names( spacy_ner_model: Optional[str], spacy_ner_models: Optional[List[str]], ) -> List[str]: """Deduplicated pipeline names; list order first, then legacy ``spacy_ner_model``.""" out: List[str] = [] seen: Set[str] = set() if spacy_ner_models: for x in spacy_ner_models: s = str(x).strip() if s and s not in seen: seen.add(s) out.append(s) one = (spacy_ner_model or "").strip() if one and one not in seen: out.append(one) return out def _collect_messages_excerpt( messages: Any, max_messages: int, max_chars: int, ) -> str: if not isinstance(messages, list) or not messages: return "" parts: List[str] = [] n = 0 for msg in reversed(messages): if n >= max_messages: break if not isinstance(msg, dict): continue role = str(msg.get("role") or "") content = msg.get("content") chunk = "" if isinstance(content, str): chunk = content elif isinstance(content, list): for block in content: if isinstance(block, dict): t = block.get("text") or block.get("content") if isinstance(t, str): chunk += t + "\n" elif isinstance(block, str): chunk += block + "\n" chunk = chunk.strip() if chunk: parts.append(f"[{role}]{chunk[:500]}") n += 1 joined = "\n".join(reversed(parts)) return clip_text_for_dialog_prompt( joined, max_chars, "messages excerpt truncated", ) def _build_field_payload( sample: dict, keys: List[str], ) -> List[Tuple[str, str]]: out: List[Tuple[str, str]] = [] for k in keys: if k not in sample: continue v = sample[k] if isinstance(v, str) and v.strip(): out.append((k, v)) return out def _canonical_field_for_redaction( field: str, inspect_keys: List[str], messages_key: Optional[str], ) -> Optional[str]: f = (field or "").strip() if f in inspect_keys: return f if messages_key and f == f"{messages_key}_excerpt": return messages_key return None def _redact_string_substrings( text: str, snippets: List[str], placeholder: str, ) -> str: if not text or not snippets: return text # Longer snippets first so we do not leave broken partial matches first. uniq = [] seen = set() for s in sorted(set(snippets), key=len, reverse=True): s = (s or "").strip() if len(s) < 2 or s in seen: continue seen.add(s) uniq.append(s) out = text for s in uniq: if s in out: out = out.replace(s, placeholder) return out def _redact_messages_substrings( messages: Any, snippets: List[str], placeholder: str, ) -> None: if not isinstance(messages, list) or not snippets: return for msg in messages: if not isinstance(msg, dict): continue content = msg.get("content") if isinstance(content, str): msg["content"] = _redact_string_substrings( content, snippets, placeholder, ) elif isinstance(content, list): for i, block in enumerate(content): if isinstance(block, dict): for k in ("text", "content"): if k in block and isinstance(block[k], str): block[k] = _redact_string_substrings( block[k], snippets, placeholder, ) elif isinstance(block, str): content[i] = _redact_string_substrings( block, snippets, placeholder, ) def _replace_messages_with_placeholder( messages: Any, placeholder: str, ) -> None: """In-place: single user turn with placeholder text (keeps list shape).""" if not isinstance(messages, list): return messages.clear() messages.append({"role": "user", "content": placeholder})
[docs] @TAGGING_OPS.register_module(OP_NAME) @OPERATORS.register_module(OP_NAME) class PiiLlmSuspectMapper(Mapper): """LLM audit (and optional redaction) for possibly missed PII. Writes JSON to ``meta[result_key]`` (default ``MetaKeys.pii_llm_suspect``). Set ``redaction_mode`` to ``evidence`` or ``whole_field`` to also modify ``inspect_keys`` string fields (and ``messages`` when listed). Place **after** ``pii_redaction_mapper``. Use ``gate_mode="heuristic"`` to call the API only when cheap patterns suggest residual risk (long digit runs, @, secret-like keywords, etc.). **Pre-LLM extensions** (still no API cost unless you enable spaCy): - ``heuristic_name_rules`` (default True): contextual CJK / English name cues so person-heavy text is not skipped when the base heuristic fires only on digits and secrets. - ``spacy_ner_models``: optional list of spaCy pipeline names (e.g. ``["zh_core_web_sm", "en_core_web_sm"]``) so one job loads both and runs NER on the same text prefix until a ``PERSON`` / ``PER`` hit. - ``spacy_ner_model``: legacy single name; merged after ``spacy_ner_models`` (deduped). Install with ``python -m spacy download <name>``. - ``spacy_auto_download`` (default True): if the pipeline is missing, run spaCy's downloader before ``spacy.load`` (needs network, uses pip). Disable in air-gapped jobs or set env ``PII_SPACY_AUTO_DOWNLOAD=0``. """
[docs] def __init__( self, api_model: str = "qwen-turbo", *, inspect_keys: Optional[List[str]] = None, messages_key: Optional[str] = "messages", max_messages_for_prompt: PositiveInt = 4, max_chars_per_field: PositiveInt = 6000, max_chars_messages_excerpt: PositiveInt = 8000, gate_mode: str = "heuristic", result_key: str = MetaKeys.pii_llm_suspect, raw_key: str = MetaKeys.pii_llm_suspect_raw, overwrite: bool = False, api_endpoint: Optional[str] = None, response_path: Optional[str] = None, system_prompt: Optional[str] = None, preferred_output_lang: str = "zh", try_num: PositiveInt = 2, model_params: Optional[Dict] = None, sampling_params: Optional[Dict] = None, text_key: str = "text", heuristic_name_rules: bool = True, spacy_ner_model: Optional[str] = None, spacy_ner_models: Optional[List[str]] = None, spacy_ner_max_chars: PositiveInt = 4000, spacy_auto_download: bool = True, redaction_mode: str = "none", redaction_placeholder: str = DEFAULT_REDACTION_PLACEHOLDER, **kwargs, ): super().__init__(text_key=text_key, **kwargs) self.inspect_keys = list( inspect_keys or ["text", "query", "response"], ) self.messages_key = messages_key self.max_messages_for_prompt = int(max_messages_for_prompt) self.max_chars_per_field = int(max_chars_per_field) self.max_chars_messages_excerpt = int(max_chars_messages_excerpt) self.gate_mode = (gate_mode or "heuristic").strip().lower() self.result_key = result_key self.raw_key = raw_key self.overwrite = bool(overwrite) self.preferred_output_lang = normalize_preferred_output_lang(preferred_output_lang) if system_prompt is not None: self.system_prompt = system_prompt elif self.preferred_output_lang == "zh": self.system_prompt = DEFAULT_SYSTEM_PROMPT else: self.system_prompt = DEFAULT_SYSTEM_PROMPT_EN self.try_num = try_num self.sampling_params = dict(sampling_params or {}) self.model_key = prepare_model( model_type="api", model=api_model, endpoint=api_endpoint, response_path=response_path, **(model_params or {}), ) rm = (redaction_mode or "none").strip().lower() if rm not in ("none", "evidence", "whole_field"): logger.warning( "pii_llm_suspect_mapper: unknown redaction_mode=%r, " "using none", redaction_mode, ) rm = "none" self.redaction_mode = rm rp = redaction_placeholder or DEFAULT_REDACTION_PLACEHOLDER self.redaction_placeholder = str(rp) self.heuristic_name_rules = bool(heuristic_name_rules) self.spacy_ner_model_names = _normalize_spacy_ner_model_names( spacy_ner_model, spacy_ner_models, ) self.spacy_ner_max_chars = int(spacy_ner_max_chars) self.spacy_auto_download = _resolve_spacy_auto_download_flag( bool(spacy_auto_download), ) self._spacy_nlp_by_name: Dict[str, Any] = {} self._spacy_load_failed: Set[str] = set()
def _get_spacy_nlp(self, model_name: str) -> Any: if model_name in self._spacy_load_failed: return None if model_name in self._spacy_nlp_by_name: return self._spacy_nlp_by_name[model_name] lock = _spacy_pipeline_lock(model_name) with lock: if model_name in self._spacy_nlp_by_name: return self._spacy_nlp_by_name[model_name] if model_name in self._spacy_load_failed: return None ensure_spacy_pipeline_installed( model_name, auto_download=self.spacy_auto_download, ) try: import spacy nlp = spacy.load(model_name) self._spacy_nlp_by_name[model_name] = nlp return nlp except Exception as e: # pragma: no cover - env specific logger.warning( "pii_llm_suspect_mapper: spacy.load(%r) failed (%s); " "skipping this model for the NER gate.", model_name, e, ) self._spacy_load_failed.add(model_name) return None def _spacy_person_trigger(self, text: str) -> bool: if not self.spacy_ner_model_names: return False snippet = (text or "")[: self.spacy_ner_max_chars] if len(snippet.strip()) < 2: return False for model_name in self.spacy_ner_model_names: nlp = self._get_spacy_nlp(model_name) if nlp is None: continue try: doc = nlp(snippet) except Exception as e: # pragma: no cover logger.debug( "pii_llm_suspect_mapper spacy infer (%s): %s", model_name, e, ) continue for ent in doc.ents: if ent.label_ in ("PERSON", "PER"): return True return False def _heuristic_allow_llm(self, user_block: str) -> bool: """If True, heuristic gate opens and the LLM may run.""" if _heuristic_trigger(user_block): return True if self.heuristic_name_rules and _name_like_rule_trigger(user_block): return True if self._spacy_person_trigger(user_block): return True return False def _assemble_user_block(self, sample: dict) -> Tuple[str, List[str]]: lines: List[str] = [] fields_used: List[str] = [] for fname, raw in _build_field_payload(sample, self.inspect_keys): clipped = clip_text_for_dialog_prompt( raw, self.max_chars_per_field, f"{fname} truncated", ) lines.append(f"### field:{fname}\n{clipped}") fields_used.append(fname) if self.messages_key and self.messages_key in sample: ex = _collect_messages_excerpt( sample[self.messages_key], self.max_messages_for_prompt, self.max_chars_messages_excerpt, ) if ex.strip(): lines.append(f"### field:{self.messages_key}_excerpt\n{ex}") fields_used.append(f"{self.messages_key}_excerpt") return "\n\n".join(lines), fields_used def _apply_redaction(self, sample: dict, cleaned: List[dict]) -> None: if self.redaction_mode == "none": return ph = self.redaction_placeholder by_field: Dict[str, List[str]] = {} fields_whole: Set[str] = set() for item in cleaned: cf = _canonical_field_for_redaction( str(item.get("field", "")), self.inspect_keys, self.messages_key, ) if not cf: continue fields_whole.add(cf) ev = str(item.get("evidence", "")).strip() if ev: by_field.setdefault(cf, []).append(ev) if self.redaction_mode == "whole_field": for cf in fields_whole: if cf not in sample: continue if self.messages_key and cf == self.messages_key: m = sample.get(self.messages_key) if isinstance(m, list): _replace_messages_with_placeholder(m, ph) elif isinstance(sample.get(cf), str): sample[cf] = ph return if self.redaction_mode == "evidence": for cf, snippets in by_field.items(): if not snippets or cf not in sample: continue if self.messages_key and cf == self.messages_key: _redact_messages_substrings( sample[self.messages_key], snippets, ph, ) elif isinstance(sample.get(cf), str): sample[cf] = _redact_string_substrings( sample[cf], snippets, ph, )
[docs] def process_single(self, sample, rank=None): meta = sample.get(Fields.meta) if not isinstance(meta, dict): sample[Fields.meta] = {} meta = sample[Fields.meta] if not self.overwrite and self.result_key in meta: return sample user_block, fields_used = self._assemble_user_block(sample) if not user_block.strip(): meta[self.result_key] = { "skipped": True, "reason": "no_inspectable_text", "fields_inspected": [], } return sample gated = self.gate_mode == "heuristic" and not self._heuristic_allow_llm( user_block, ) if gated: meta[self.result_key] = { "skipped": True, "reason": "heuristic_gate", "fields_inspected": fields_used, "likely_clean": True, } return sample user_msg = "以下文本可能已部分正则脱敏。请按系统说明只输出 JSON。\n\n" + user_block messages = [ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": user_msg}, ] sp = {**self.sampling_params} sp.setdefault("max_tokens", 512) sp.setdefault("temperature", 0.1) raw = "" for attempt in range(self.try_num): try: client = get_model(self.model_key, rank=rank) raw = client(messages, **sp) if raw and isinstance(raw, str) and raw.strip(): break except Exception as e: logger.warning( "pii_llm_suspect_mapper attempt %s: %s", attempt + 1, e, ) if not raw: meta[self.result_key] = { "error": "empty_llm_response", "fields_inspected": fields_used, } return sample parsed = _extract_json_object(raw) if parsed is None: meta[self.raw_key] = raw[:8000] meta[self.result_key] = { "error": "json_parse_failed", "fields_inspected": fields_used, } return sample suspected = parsed.get("suspected") if not isinstance(suspected, list): suspected = [] cleaned: List[dict] = [] for item in suspected[:32]: if not isinstance(item, dict): continue cleaned.append( { "field": str(item.get("field", ""))[:64], "category": str(item.get("category", ""))[:120], "evidence": str(item.get("evidence", ""))[:80], } ) default_clean = len(cleaned) == 0 likely_clean = bool(parsed.get("likely_clean", default_clean)) meta[self.result_key] = { "suspected": cleaned, "likely_clean": likely_clean, "fields_inspected": fields_used, "redaction_mode": self.redaction_mode, "redaction_applied": self.redaction_mode != "none", } self._apply_redaction(sample, cleaned) return sample