Source code for data_juicer.ops.mapper.python_file_mapper
import importlib.util
import inspect
import os
from ..base_op import OPERATORS, Mapper
OP_NAME = "python_file_mapper"
[docs]
@OPERATORS.register_module(OP_NAME)
class PythonFileMapper(Mapper):
"""Mapper for executing Python function defined in a file."""
[docs]
def __init__(self, file_path: str = "", function_name: str = "process_single", batched: bool = False, **kwargs):
"""
Initialization method.
:param file_path: The path to the Python file containing the function
to be executed.
:param function_name: The name of the function defined in the file
to be executed.
:param batched: A boolean indicating whether to process input data in
batches.
:param kwargs: Additional keyword arguments passed to the parent class.
"""
self._batched_op = bool(batched)
super().__init__(**kwargs)
self.file_path = file_path
self.function_name = function_name
if not file_path:
self.func = lambda sample: sample
else:
self.func = self._load_function()
def _load_function(self):
if not os.path.isfile(self.file_path):
raise FileNotFoundError(f"The file '{self.file_path}' does not exist.")
if not self.file_path.endswith(".py"):
raise ValueError(f"The file '{self.file_path}' is not a Python file.")
# Load the module from the file
module_name = os.path.splitext(os.path.basename(self.file_path))[0]
spec = importlib.util.spec_from_file_location(module_name, self.file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Fetch the specified function from the module
if not hasattr(module, self.function_name):
raise ValueError(f"Function '{self.function_name}' not found in '{self.file_path}'.") # noqa: E501
func = getattr(module, self.function_name, None)
if not callable(func):
raise ValueError(f"The attribute '{self.function_name}' is not callable.")
# Check that the function has exactly one argument
argspec = inspect.getfullargspec(func)
if len(argspec.args) != 1:
raise ValueError(f"The function '{self.function_name}' must take exactly one argument") # noqa: E501
return func
[docs]
def process_single(self, sample):
"""Invoke the loaded function with the provided sample."""
result = self.func(sample)
if not isinstance(result, dict):
raise ValueError(f"Function must return a dictionary, got {type(result).__name__} instead.") # noqa: E501
return result
[docs]
def process_batched(self, samples):
"""Invoke the loaded function with the provided samples."""
result = self.func(samples)
if not isinstance(result, dict):
raise ValueError(f"Function must return a dictionary, got {type(result).__name__} instead.") # noqa: E501
return result