Source code for data_juicer.tools.hpo.execute_hpo_3sigma

import copy
import json
import sys

import yaml
from jsonargparse import Namespace, namespace_to_dict
from loguru import logger

from data_juicer.config import init_configs, prepare_cfgs_for_export
from data_juicer.core import Analyzer, DefaultExecutor
from data_juicer.utils.constant import StatsKeys


[docs] @logger.catch(reraise=True) def main(): path_k_sigma_recipe = None for i in range(len(sys.argv) - 1): if sys.argv[i] == "--path_k_sigma_recipe": path_k_sigma_recipe = sys.argv[i + 1] # 1. analyze using the given initial recipe cfg = init_configs() logger.info("Begin to analyze data using the given initial recipe") analyzer = Analyzer(cfg) analyzer.run() df = analyzer.overall_result # 2. adjust the hyper-parameters of the given recipe with k-sigma rule modify_recipe_k_sigma(cfg, df, path_k_sigma_recipe) # 3. process the data using the refined recipe logger.info("Begin to process the data with refined recipe") if cfg.executor_type == "default": executor = DefaultExecutor(cfg) elif cfg.executor_type == "ray": from data_juicer.core.executor.ray_executor import RayExecutor executor = RayExecutor(cfg) executor.run()
[docs] def modify_recipe_k_sigma(cfg, df, path_k_sigma_recipe, k=3): # get the mapping from op_name to their mu and sigma mean_series = df[df.index == "mean"] stats_key_to_mean = mean_series.iloc[0, :].to_dict() std_series = df[df.index == "std"] stats_key_to_std = std_series.iloc[0, :].to_dict() op_name_to_stats_key = StatsKeys.get_access_log(dj_cfg=cfg) logger.info(f"Begin to modify the recipe with {k}-sigma rule") for i in range(len(cfg.process)): if isinstance(cfg.process[i], Namespace): cfg.process[i] = namespace_to_dict(cfg.process[i]) for process in cfg.process: op_name, args = list(process.items())[0] temp_args = copy.deepcopy(args) if op_name not in op_name_to_stats_key: # skip the op such as `clean_email_mapper` continue stats_keys = op_name_to_stats_key[op_name] for stats_key in stats_keys: if stats_key in stats_key_to_mean: for arg_name in temp_args.keys(): new_val = None if "min" in arg_name: new_val = stats_key_to_mean[stats_key] - k * stats_key_to_std[stats_key] if "max" in arg_name: new_val = stats_key_to_mean[stats_key] + k * stats_key_to_std[stats_key] if new_val is not None and str(new_val) != "nan": logger.info( f"Using {k}-sigma rule, for op {op_name}, " f"changed its para " f"{arg_name}={args[arg_name]} into " f"{arg_name}={new_val}" ) args[arg_name] = new_val if path_k_sigma_recipe: cfg = prepare_cfgs_for_export(cfg) if path_k_sigma_recipe.endswith(".yaml") or path_k_sigma_recipe.endswith(".yml"): with open(path_k_sigma_recipe, "w") as fout: yaml.safe_dump(cfg, fout) elif path_k_sigma_recipe.endswith(".json"): with open(path_k_sigma_recipe, "w") as fout: json.dump(cfg, fout) else: raise TypeError( f"Unrecognized output file type:" f" [{path_k_sigma_recipe}]. Should be one of the types" f' [".yaml", ".yml", ".json"].' )
if __name__ == "__main__": main()