数据追踪#

本文档描述 DataJuicer 的追踪系统,用于跟踪数据处理过程中样本级别的变化。

概述#

Tracer 记录每个算子在处理管道中如何修改、过滤或去重各个样本。这对以下场景非常有用:

  • 调试 — 理解特定样本为何被修改或删除

  • 质量保证 — 验证算子是否按预期工作

  • 审计 — 维护数据转换的记录

配置#

基本设置#

open_tracer: false        # 启用/禁用追踪
op_list_to_trace: []      # 要追踪的算子列表(空 = 所有算子)
trace_num: 10             # 每个算子最多收集的样本数
trace_keys: []            # 追踪输出中包含的额外字段

命令行#

# 启用所有算子的追踪
dj-process --config config.yaml --open_tracer true

# 仅追踪特定算子
dj-process --config config.yaml --open_tracer true \
    --op_list_to_trace clean_email_mapper,words_num_filter

# 每个算子收集更多样本
dj-process --config config.yaml --open_tracer true --trace_num 50

# 在追踪输出中包含额外字段
dj-process --config config.yaml --open_tracer true \
    --trace_keys sample_id,source_file

输出结构#

追踪结果存储在工作目录的 trace/ 子目录中:

{work_dir}/
└── trace/
    ├── sample_trace-clean_email_mapper.jsonl
    ├── sample_trace-words_num_filter.jsonl
    ├── duplicate-document_deduplicator.jsonl
    └── ...

每个追踪文件为 JSONL 格式(每行一个 JSON 对象),内容因算子类型而异。

追踪的算子类型#

Mapper 追踪#

对于 Mapper 算子,Tracer 记录文本内容发生变化的样本。每条记录包含:

字段

描述

original_text

Mapper 处理前的文本

processed_text

Mapper 处理后的文本

trace_keys 字段

配置的 trace_keys 对应的值

输出示例(sample_trace-clean_email_mapper.jsonl):

{"original_text":"联系我们 user@example.com 获取详情。","processed_text":"联系我们  获取详情。"}
{"original_text": "邮箱:admin@test.org", "processed_text": "邮箱:"}

仅收集文本实际发生变化的样本,未变化的样本会被跳过。

Filter 追踪#

对于 Filter 算子,Tracer 记录被过滤掉(删除)的样本。每条记录包含完整的样本数据。

输出示例(sample_trace-words_num_filter.jsonl):

{"text": "Too short.", "__dj__stats__": {"words_num": 2}}
{"text": "Also brief.", "__dj__stats__": {"words_num": 2}}

仅收集被过滤掉的样本,通过过滤器的样本会被跳过。

Deduplicator 追踪#

对于 Deduplicator 算子,Tracer 记录近似重复的样本对。每条记录包含:

字段

描述

dup1

重复对中的第一个样本

dup2

重复对中的第二个样本

输出示例(duplicate-document_deduplicator.jsonl):

{"dup1": "这是一段重复的文本。", "dup2": "这是一段重复的文本。"}

样本收集行为#

Tracer 使用高效的样本级收集方式:

  1. 每个算子在处理过程中最多收集 trace_num 个样本

  2. 收集到足够样本后提前停止

  3. 在默认模式下,收集是线程安全的,使用多进程锁

  4. 在 Ray 模式下,每个 Worker 有自己的 Tracer 实例(无需加锁)

这种设计最大限度地减少了性能开销——Tracer 不会比较整个数据集,而是在处理过程中实时捕获变化。

trace_keys#

trace_keys 选项允许在追踪输出中包含原始样本的额外字段。这对于识别哪些样本受到影响非常有用:

open_tracer: true
trace_keys:
  - sample_id
  - source_file

使用此配置,Mapper 追踪条目将包含:

{
  "sample_id": "doc_00042",
  "source_file": "corpus_part1.jsonl",
  "original_text": "原始内容...",
  "processed_text": "处理后的内容..."
}

API 参考#

Tracer(默认模式)#

from data_juicer.core.tracer import Tracer

tracer = Tracer(
    work_dir="./outputs",
    op_list_to_trace=["clean_email_mapper", "words_num_filter"],
    show_num=10,
    trace_keys=["sample_id"]
)

# 检查某个算子是否需要追踪
tracer.should_trace_op("clean_email_mapper")  # True

# 检查是否已收集足够的样本
tracer.is_collection_complete("clean_email_mapper")  # False

# 收集 Mapper 样本
tracer.collect_mapper_sample(
    op_name="clean_email_mapper",
    original_sample={"text": "邮箱:a@b.com"},
    processed_sample={"text": "邮箱:"},
    text_key="text"
)

# 收集 Filter 样本
tracer.collect_filter_sample(
    op_name="words_num_filter",
    sample={"text": "太短"},
    should_keep=False
)

RayTracer(分布式模式)#

from data_juicer.core.tracer.ray_tracer import RayTracer

# RayTracer 是一个 Ray Actor — 通过 Ray 创建
tracer = RayTracer.remote(
    work_dir="./outputs",
    op_list_to_trace=None,  # 追踪所有算子
    show_num=10,
    trace_keys=["sample_id"]
)

# 远程方法调用
ray.get(tracer.collect_mapper_sample.remote(
    op_name="clean_email_mapper",
    original_sample={"text": "邮箱:a@b.com"},
    processed_sample={"text": "邮箱:"},
    text_key="text"
))

# 最终化并导出所有追踪结果
ray.get(tracer.finalize_traces.remote())

辅助函数#

data_juicer.core.tracer 模块提供了模式无关的辅助函数:

from data_juicer.core.tracer import (
    should_trace_op,
    check_tracer_collect_complete,
    collect_for_mapper,
    collect_for_filter,
)

# 这些函数自动处理默认模式和 Ray 模式
should_trace_op(tracer_instance, "clean_email_mapper")
check_tracer_collect_complete(tracer_instance, "clean_email_mapper")
collect_for_mapper(tracer_instance, "op_name", original, processed, "text")
collect_for_filter(tracer_instance, "op_name", sample, should_keep=False)

性能考虑#

开销#

  • trace_num 较小时(默认:10),追踪的额外开销极小

  • 一旦某个算子收集了 trace_num 个样本,就不再进行进一步收集

  • 主要成本是 Mapper 中原始文本与处理后文本的比较

建议#

场景

推荐设置

开发/调试

open_tracer: truetrace_num: 10-50

生产运行

open_tracer: false

审计特定算子

open_tracer: trueop_list_to_trace: [特定算子]

大规模追踪

open_tracer: truetrace_num: 100,指定 op_list_to_trace

故障排除#

没有生成追踪文件:

# 验证追踪器是否启用
grep "open_tracer" config.yaml

# 检查追踪目录是否存在
ls -la ./outputs/{work_dir}/trace/

追踪文件为空:

  • 对于 Mapper:算子可能没有修改任何样本

  • 对于 Filter:算子可能没有过滤掉任何样本

  • 检查日志中是否有类似 "Datasets before and after op [X] are all the same" 的警告

追踪文件中样本太少:

  • 增加 trace_num 以收集更多样本

  • 数据集中变化/过滤的样本可能少于 trace_num