data_juicer.tools.quality_classifier.qc_utils 源代码

import os
import zipfile

import numpy as np
import wget
from loguru import logger
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, udf
from pyspark.sql.types import ArrayType, DoubleType, IntegerType, StringType

from data_juicer.utils.cache_utils import DATA_JUICER_MODELS_CACHE
from data_juicer.utils.model_utils import MODEL_LINKS, prepare_sentencepiece_for_lang


[文档] def init_spark(spark_executor_memory=None, spark_driver_memory=None, spark_executor_memoryOverhead=None): """ Initialize a spark session. You can set parameters such as memory, number of partitions, timeout and so on here :return: A spark session instance. """ if not spark_executor_memory: spark_executor_memory = "64g" if not spark_driver_memory: spark_driver_memory = "64g" if not spark_executor_memoryOverhead: spark_executor_memoryOverhead = "20000" spark = ( SparkSession.builder.master("local[*]") .config("spark.driver.memory", spark_driver_memory) .config("spark.executor.memory", spark_executor_memory) .config("spark.sql.shuffle.partitions", "300") .config("spark.sql.execution.arrow.pyspark.enabled", "true") .config("spark.executor.memoryOverhead", spark_executor_memoryOverhead) .config("spark.network.timeout", "10000s") .config("spark.executor.heartbeatInterval", "3600s") .getOrCreate() ) logger.info("Spark initialization done.") return spark
[文档] def prepare_model(model_name, model_path=DATA_JUICER_MODELS_CACHE): """ Prepare the specific model from model cache path or the remote oss :param model_name: name of the quality classifier model :param model_path: the path to store the model to be loaded :return: a loaded PipelineModel """ udm = False if model_name not in ["gpt3", "chinese", "code"]: # use user-specific model real_model_path = model_name udm = True else: # use prepared models we provided model_name = "%s_quality_model" % model_name real_model_path = os.path.join(model_path, model_name) logger.info(f"Preparing scorer model in [{real_model_path}]...") if os.path.exists(real_model_path) and os.path.isdir(real_model_path): return PipelineModel.load(real_model_path) if udm: logger.error(f"Customized model [{real_model_path}] cannot be loaded.") exit(0) # No specific models in local file systems. Download them from remote. os.makedirs(model_path, exist_ok=True) wget.download( os.path.join(MODEL_LINKS, f"{model_name}.zip"), os.path.join(model_path, f"{model_name}.zip"), bar=None ) # extract the compressed model file into a model directory with zipfile.ZipFile(os.path.join(model_path, f"{model_name}.zip")) as zp: zp.extractall(os.path.join(model_path)) return PipelineModel.load(real_model_path)
[文档] def load_dataset(spark, ds_path, text_key="text", only_text=False): """ Load a single dataset using PySpark. Only support 'json', 'jsonl', or 'parquet' files for now :param spark: spark session :param ds_path: dataset path :param text_key: the name of the column that stores the contents of texts :param only_text: whether to load texts only and drop other columns. :return: a data frame """ # load dataset using different methods according to the suffix logger.info(f"Loading dataset from [{ds_path}]...") if ds_path.endswith(".json") or ds_path.endswith(".jsonl"): df = spark.read.json(ds_path) elif ds_path.endswith(".parquet"): df = spark.read.parquet(ds_path) else: raise NotImplementedError( "Dataset type is not supported for now. " "Suffix of dataset file should be one of " "[.json, .jsonl, .parquet]" ) # rename the column that stores texts to "text" if necessary if text_key != "text": df = df.withColumnRenamed(text_key, "text") # whether to keep "text" column only if only_text: return df.select("text") else: return df
[文档] def load_datasets(spark, ds_paths, text_key="text", label=None, only_text=True): """ Load a list of datasets. Only support 'json', 'jsonl', or 'parquet' files for now :param spark: spark session :param ds_paths: a list of datasets to be loaded. :param text_key: the name of the column that stores the contents of texts :param label: the label set to these datasets. Used in training pipeline :param only_text: whether to load texts only and drop other columns. :return: a data frame """ if len(ds_paths) == 0: logger.warning("No dataset path provided.") return None # load each dataset in order and union them all base_ds = load_dataset(spark, ds_paths[0], text_key, only_text) for i in range(1, len(ds_paths)): base_ds = base_ds.unionAll(load_dataset(spark, ds_paths[i], text_key, only_text)) if label is not None: # add labels for training pipeline return base_ds.selectExpr("text", "%d as label" % label) else: return base_ds
[文档] def shuffle(df): """ Shuffle a data frame :param df: input data frame :return: shuffled data frame """ temp_df = df.withColumn("rand", rand(seed=42)) df_rnd = temp_df.orderBy(temp_df.rand) return df_rnd.drop(df_rnd.rand)
[文档] def export_result(ds, res_path): """ Export a dataset to specified path. Only support 'json', 'jsonl', or 'parquet' export formats for now :param ds: the dataset to be exported :param res_path: the path to store the exported dataset :return: """ logger.info(f"Exporting predicted result to [{res_path}]") if res_path.endswith(".json") or res_path.endswith(".jsonl"): ds.write.mode("overwrite").format("json").save(res_path) elif res_path.endswith(".parquet"): ds.write.mode("overwrite").format("parquet").save(res_path) else: ds.write.mode("overwrite").save(res_path)
[文档] def get_keep_method_udf(keep_method): """ Given the name of keep method, return a PySpark user-defined function of this kind of keep method. Only support 'gpt3' or 'label' for now :param keep_method: name of keep method :return: a PySpark udf of specified keep method """ if keep_method == "label": return udf(lambda score: int(score > 0.5), IntegerType()) elif keep_method == "gpt3": pareto = 9 return udf(lambda score: int(score > 1 - np.random.pareto(pareto)), IntegerType()) else: raise NotImplementedError(f"Keep method [{keep_method}] is not " f"implemented for now.")
[文档] def tokenize_dataset(ds, tokenizer): """ Tokenize the texts in input dataset using specified tokenizer :param ds: dataset to be tokenized :param tokenizer: tokenizer used to tokenize texts :return: a dataset with an extra column "words" that stores the tokenized texts """ tkn = prepare_sentencepiece_for_lang("", tokenizer) # create a PySpark udf to tokenize the dataset tokenizer_udf = udf(lambda text: tkn.encode_as_pieces(text), ArrayType(StringType())) logger.info("Tokenize texts using specific tokenizer...") return ds.withColumn("words", tokenizer_udf(col("text")))
[文档] def train(output_model_path, ds, tokenizer=None): """ Train a quality classifier with training dataset and export the trained model to a specified path :param output_model_path: the path to store the trained model :param ds: training dataset :param tokenizer: specified sentencepiece tokenizer. It's None in default, which means using the standard Tokenizer in PySpark :return: """ logger.info("Preparing training quality classifier model...") if tokenizer: # tokenizer is not standard Tokenizer in PySpark, need to apply it # explicitly ds = tokenize_dataset(ds, tokenizer) # model hashingTF = HashingTF(inputCol="words", outputCol="features") lr = LogisticRegression() if tokenizer is None: # using standard Tokenizer in PySpark std_tokenizer = Tokenizer(inputCol="text", outputCol="words") pipeline = Pipeline(stages=[std_tokenizer, hashingTF, lr]) else: # using extra sentencepiece tokenizer, which will not included in the # final PipelineModel pipeline = Pipeline(stages=[hashingTF, lr]) logger.info("Start training...") model = pipeline.fit(ds) logger.info("Trained model saving...") model.write().overwrite().save(output_model_path)
[文档] def eval(model_path, ds, tokenizer=None): """ Evaluate a quality classifier model on specified dataset :param model_path: the path to the model to be evaluated :param ds: evaluation dataset :param tokenizer: specified sentencepiece tokenizer. It's None in default, which means using the standard Tokenizer in PySpark :return: """ logger.info("Preparing to evaluate...") if tokenizer: # tokenizer is not standard Tokenizer in PySpark, need to apply it # explicitly ds = tokenize_dataset(ds, tokenizer) logger.info("Start evaluation...") model = prepare_model(model_path) pred = model.transform(ds) # get positive and negative samples P = pred.filter("label = 1") N = pred.filter("label = 0") # get TP, FP, TN, FN samples TP = P.filter("prediction = 1").count() + 1 FP = N.filter("prediction = 1").count() + 1 TN = N.filter("prediction = 0").count() + 1 FN = P.filter("prediction = 0").count() + 1 # compute precision, recall and F1 metrics precision = 1.0 * TP / (TP + FP) recall = 1.0 * TP / P.count() F1 = 2.0 * precision * recall / (precision + recall) logger.info(f"TP: {TP}, FN: {FN}") logger.info(f"FP: {FP}, TN: {TN}") logger.info(f"P: {precision}, R: {recall}, F1: {F1}")
[文档] def predict(model, ds, tokenizer=None, keep_method="label"): """ Predict document scores for a dataset using a trained quality classifier model :param model: the model used to predict :param ds: the dataset to be predicted :param tokenizer: specified sentencepiece tokenizer. It's None in default, which means using the standard Tokenizer in PySpark :param keep_method: name of keep method to label the "should_keep" column :return: """ logger.info("Start scoring dataset...") if tokenizer: # tokenizer is not standard Tokenizer in PySpark, need to apply it # explicitly ds = tokenize_dataset(ds, tokenizer) prediction = model.transform(ds) # A UDF to extract doc scores from probability vectors def extract_prob(v): try: return float(v[1]) except ValueError: return None # extract the predicted probability as the doc_score extract_prob_udf = udf(extract_prob, DoubleType()) doc_score = prediction.withColumn("doc_score", extract_prob_udf(col("probability"))) # A UDF to get the bool value indicating whether this sample should be kept should_keep_label_udf = get_keep_method_udf(keep_method) should_keep = doc_score.withColumn("should_keep", should_keep_label_udf(col("doc_score"))) # drop extra useless columns return should_keep.drop("words", "features", "rawPrediction", "probability", "prediction")