data_juicer.tools.quality_classifier.train 源代码

# This tool is used for training a quality classifier for your own datasets
# based on PySpark.
#
# After training, this tool will generate a classifier model in a specific
# directory. You can use it to evaluate or predict on other datasets using eval
# and predict tools.
#
# This tool needs several arguments:
#   - positive_datasets: the paths to the positive datasets. It could be a
#       string for a single dataset, e.g. 'pos.parquet', or a list of strings
#       for several datasets, e.g. '["pos1.parquet", "pos2.parquet"]'.
#   - negative_datasets: the paths to the negative datasets. It could be a
#       string for a single dataset, e.g. 'neg.parquet', or a list of strings
#       for several datasets, e.g. '["neg1.parquet", "neg2.parquet"]'.
#   - output_model_path: the path to store the trained quality classifier. It's
#       "my_quality_model" in default.
#   - num_training_samples: number of samples used to train the model. It's 0
#       in default, which means using all samples in datasets to train.
#   - train_test_split_ratio: ratio to split train and test set. It's 0.8 in
#       default.
#   - tokenizer: what tokenizer to use to tokenize texts. It's None in default,
#       which means using the standard Tokenizer of PySpark. You can use one of
#       ["zh.sp.model", "code.sp.model"] we provided, or you can set it to the
#       path to your own sentencepiece model.
#   - evaluation: whether to evaluate the model after training using test set.
#       It's True in default.
#   - text_key: the field key name to hold texts to be classified. It's "text"
#       in default.

import fire
from loguru import logger

from data_juicer.tools.quality_classifier.qc_utils import (
    eval,
    init_spark,
    load_datasets,
    shuffle,
    train,
)


[文档] @logger.catch(reraise=True) def main( positive_datasets, negative_datasets, output_model_path="my_quality_model", num_training_samples=0, train_test_split_ratio=0.8, tokenizer=None, evaluation=True, text_key="text", ): """ Train a quality classifier using your own pos/neg datasets :param positive_datasets: the paths to the positive datasets. It could be a string for a single dataset, e.g. 'pos.parquet', or a list of strings for several datasets, e.g. '["pos1.parquet", "pos2.parquet"]' :param negative_datasets: the paths to the negative datasets. It could be a string for a single dataset, e.g. 'neg.parquet', or a list of strings for several datasets, e.g. '["neg1.parquet", "neg2.parquet"]' :param output_model_path: the path to store the trained quality classifier. It's "my_quality_model" in default :param num_training_samples: number of samples used to train the model. It's 0 in default, which means using all samples in datasets to train :param train_test_split_ratio: ratio to split train and test set. It's 0.8 in default :param tokenizer: what tokenizer to use to tokenize texts. It's None in default, which means using the standard Tokenizer of PySpark. You can use one of ["zh.sp.model", "code.sp.model"] we provided, or you can set it to the path to your own sentencepiece model :param evaluation: whether to evaluate the model after training using test set. It's True in default :param text_key: the field key name to hold texts to be classified. It's "text" in default :return: """ # convert a single dataset to a dataset list if isinstance(positive_datasets, str): positive_datasets = [positive_datasets] if isinstance(negative_datasets, str): negative_datasets = [negative_datasets] # initialize a spark session spark = init_spark() # load positive and negative datasets pos = load_datasets(spark, positive_datasets, text_key=text_key, label=1, only_text=True) neg = load_datasets(spark, negative_datasets, text_key=text_key, label=0, only_text=True) if pos is None or neg is None: logger.error("Empty dataset in positive/negative dataset list...") exit(1) # sample a part of positive/negative samples to train if num_training_samples > 0: logger.info(f"Only use {num_training_samples} pairs samples to train.") pos = shuffle(pos).limit(num_training_samples) neg = shuffle(neg).limit(num_training_samples) # merge pos and neg samples ds = pos.unionAll(neg) # split the merged dataset into training and test set train_set, test_set = ds.randomSplit([train_test_split_ratio, 1.0 - train_test_split_ratio], seed=42) logger.info(f"Number of training samples: {train_set.count()}, " f"test samples: {test_set.count()}") # start the ML pipeline to train the classifier train(output_model_path, train_set, tokenizer) # evaluate the trained model on test set if evaluation: eval(output_model_path, test_set, tokenizer)
if __name__ == "__main__": fire.Fire(main)