data_juicer.utils.jsonl_lenient_loader 源代码
"""
Stream local JSONL with stdlib :func:`json.loads`, skipping bad lines.
Used when HuggingFace's JSON builder (ujson) fails on some rows or when you
need per-line fault tolerance. Output is a normal :class:`datasets.Dataset`,
so downstream operators behave the same as with the default JSONL loader.
"""
from __future__ import annotations
import gzip
import io
import json
import os
from typing import Any, Dict, Iterator, List, Tuple
import zstandard as zstd
from datasets import Dataset
from data_juicer.utils.constant import Fields
# Keys produced by :func:`data_juicer.utils.file_utils.find_files_with_suffix`
JSONL_LENIENT_EXTENSIONS = frozenset({".jsonl", ".jsonl.gz", ".jsonl.zst"})
def _iter_text_lines(path: str) -> Iterator[str]:
if path.endswith(".jsonl.gz"):
with gzip.open(path, "rt", encoding="utf-8", newline="") as handle:
yield from handle
elif path.endswith(".jsonl.zst"):
with open(path, "rb") as compressed:
dctx = zstd.ZstdDecompressor()
with dctx.stream_reader(compressed) as reader:
text_stream = io.TextIOWrapper(
reader,
encoding="utf-8",
newline="",
)
yield from text_stream
else:
with open(path, "r", encoding="utf-8", newline="") as handle:
yield from handle
[文档]
def iter_lenient_jsonl_records(
file_ext_pairs: List[Tuple[str, str]],
*,
add_suffix_column: bool,
) -> Iterator[Dict[str, Any]]:
"""
Yield one dict per valid JSON object line.
:param file_ext_pairs: ``(file_path, ext_key)`` where ``ext_key`` is the
suffix key from ``find_files_with_suffix`` (e.g. ``\".jsonl\"``).
:param add_suffix_column: if True, set ``Fields.suffix`` to match the
default HF loader (``\".\" + ext_key.strip(\".\")``).
"""
# Use stdlib logging inside generator for pickle compatibility with dill.
# loguru Logger contains multiprocessing.Condition which cannot be pickled.
import logging
_logger = logging.getLogger(__name__)
skipped = 0
for path, ext_key in file_ext_pairs:
if not os.path.isfile(path):
_logger.warning(f"[lenient jsonl] missing file, skip: {path}")
continue
suffix_val = ("." + ext_key.strip(".")) if add_suffix_column else None
try:
line_iter = _iter_text_lines(path)
except OSError as exc:
_logger.error(f"[lenient jsonl] cannot open {path}: {exc}")
continue
for lineno, line in enumerate(line_iter, 1):
chunk = line.strip()
if not chunk:
continue
try:
obj = json.loads(chunk)
except (json.JSONDecodeError, ValueError) as exc:
skipped += 1
_logger.warning(f"[lenient jsonl] skip {path}:{lineno}: {exc}")
continue
if not isinstance(obj, dict):
skipped += 1
typ = type(obj).__name__
_logger.warning(f"[lenient jsonl] skip {path}:{lineno}: " f"expected JSON object, got {typ}")
continue
if suffix_val is not None:
row = dict(obj)
row[Fields.suffix] = suffix_val
yield row
else:
yield obj
if skipped:
_logger.info(f"[lenient jsonl] finished with {skipped} skipped line(s) " "(see warnings above)")
[文档]
def dataset_from_lenient_jsonl_files(
file_ext_pairs: List[Tuple[str, str]],
*,
add_suffix_column: bool,
) -> Dataset:
"""Build a :class:`datasets.Dataset` by streaming all given JSONL files."""
def _gen(pairs, add_suffix):
yield from iter_lenient_jsonl_records(
pairs,
add_suffix_column=add_suffix,
)
return Dataset.from_generator(
_gen,
gen_kwargs={
"pairs": file_ext_pairs,
"add_suffix": add_suffix_column,
},
)