Source code for data_juicer.ops.mapper.latex_merge_tex_mapper

import tarfile
import zipfile

from loguru import logger

from ..base_op import OPERATORS, Mapper

OP_NAME = "latex_merge_tex_mapper"


[docs] @OPERATORS.register_module(OP_NAME) class LatexMergeTexMapper(Mapper): """Extracts and concatenates all ``.tex`` files from a compressed LaTeX project archive into a single text field. Supported archive formats: ``.tar``, ``.tar.gz`` / ``.tgz``, and ``.zip``. Plain ``.gz`` (single-file gzip) is **not** supported because gzip archives carry no filename metadata, making it impossible to verify that the content is actually a ``.tex`` file. All ``.tex`` files found inside the archive are read in-memory and joined with a configurable separator. No ordering or deduplication is applied. This operator is typically placed before LaTeX-processing operators such as ``remove_comments_mapper``, ``expand_macro_mapper``, or ``latex_figure_context_extractor_mapper``."""
[docs] def __init__( self, compressed_file_key: str = "compressed_file", separator: str = "\n\n", max_file_size: int = 50 * 1024 * 1024, max_total_size: int = 100 * 1024 * 1024, *args, **kwargs, ): """ Initialization method. :param compressed_file_key: Field name that stores the archive file path. :param separator: String used to join the contents of multiple ``.tex`` files. :param max_file_size: Maximum allowed uncompressed size in bytes for a single ``.tex`` entry inside the archive. Entries exceeding this limit are skipped with a warning. Set to ``None`` or ``0`` to disable the check. :param max_total_size: Maximum allowed cumulative size in bytes for all extracted ``.tex`` content combined. Once this limit is reached, remaining files in the archive are skipped with a warning. Set to ``None`` or ``0`` to disable the check. :param args: extra args :param kwargs: extra args """ super().__init__(*args, **kwargs) self.compressed_file_key = compressed_file_key self.separator = separator self.max_file_size = max_file_size or 0 self.max_total_size = max_total_size or 0
def _extract_tex_contents(self, archive_path: str): """Return a list of decoded ``.tex`` file contents from *archive_path*. Dispatches by file extension to the appropriate reader.""" path_lower = archive_path.lower() try: if path_lower.endswith(".zip"): return self._read_zip(archive_path, self.max_file_size, self.max_total_size) elif path_lower.endswith((".tar.gz", ".tgz", ".tar")): return self._read_tar(archive_path, self.max_file_size, self.max_total_size) else: logger.warning( f"Unsupported archive format: {archive_path}. " f"Supported formats: .tar, .tar.gz, .tgz, .zip" ) return [] except Exception: logger.exception(f"Failed to read archive {archive_path}") return [] @staticmethod def _read_tar(archive_path: str, max_file_size: int = 0, max_total_size: int = 0): contents = [] total_bytes = 0 with tarfile.open(archive_path, "r:*") as tf: for member in tf: if not member.isfile(): continue if not member.name.endswith(".tex"): continue if max_file_size and member.size > max_file_size: logger.warning( f"Skipping {member.name} in {archive_path}: " f"declared size {member.size} bytes exceeds " f"limit of {max_file_size} bytes" ) continue # Use declared header size to bail before reading. if max_total_size and (total_bytes + member.size) > max_total_size: logger.warning( f"Cumulative extracted size would exceed limit " f"of {max_total_size} bytes in {archive_path}. " f"Skipping remaining files." ) break raw = tf.extractfile(member) if raw is None: continue raw_bytes = raw.read() if max_file_size and len(raw_bytes) > max_file_size: logger.warning( f"Skipping {member.name} in {archive_path}: " f"actual size {len(raw_bytes)} bytes exceeds " f"limit of {max_file_size} bytes" ) continue total_bytes += len(raw_bytes) if max_total_size and total_bytes > max_total_size: logger.warning( f"Cumulative extracted size {total_bytes} bytes " f"exceeds limit of {max_total_size} bytes in " f"{archive_path}. Skipping remaining files." ) break contents.append(raw_bytes.decode("utf-8", errors="replace")) return contents @staticmethod def _read_zip(archive_path: str, max_file_size: int = 0, max_total_size: int = 0): contents = [] total_bytes = 0 with zipfile.ZipFile(archive_path) as zf: for name in zf.namelist(): if not name.endswith(".tex"): continue info = zf.getinfo(name) if max_file_size and info.file_size > max_file_size: logger.warning( f"Skipping {name} in {archive_path}: " f"declared size {info.file_size} bytes exceeds " f"limit of {max_file_size} bytes" ) continue # Use declared header size to bail before reading. if max_total_size and (total_bytes + info.file_size) > max_total_size: logger.warning( f"Cumulative extracted size would exceed limit " f"of {max_total_size} bytes in {archive_path}. " f"Skipping remaining files." ) break raw_bytes = zf.read(name) if max_file_size and len(raw_bytes) > max_file_size: logger.warning( f"Skipping {name} in {archive_path}: " f"actual size {len(raw_bytes)} bytes exceeds " f"limit of {max_file_size} bytes" ) continue total_bytes += len(raw_bytes) if max_total_size and total_bytes > max_total_size: logger.warning( f"Cumulative extracted size {total_bytes} bytes " f"exceeds limit of {max_total_size} bytes in " f"{archive_path}. Skipping remaining files." ) break contents.append(raw_bytes.decode("utf-8", errors="replace")) return contents
[docs] def process_single(self, sample): if self.compressed_file_key not in sample: raise ValueError( f"Compressed file key '{self.compressed_file_key}' " f"not found in sample. " f"Available keys: {list(sample.keys())}" ) path = sample[self.compressed_file_key] tex_contents = self._extract_tex_contents(path) sample[self.text_key] = self.separator.join(tex_contents) return sample