data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator module#

class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.IdGenerator(start_id=0)[源代码]#

基类:object

__init__(start_id=0)[源代码]#
get_next_id(count)[源代码]#
class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.EdgeBuffer[源代码]#

基类:object

__init__()[源代码]#
clear()[源代码]#
set_edges(edge_dict)[源代码]#
get_edges(key)[源代码]#
class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.BTSUnionFind(union_threshold, parallel_num, parallel_id, remote_edge_buffers, max_pending_edge_buffer_task, num_edge_buffer_task_returns)[源代码]#

基类:object

A distributed implementation of Union-Find with load balancing.

The original paper on BTS Union-Find is available at: https://ieeexplore.ieee.org/document/10598116

__init__(union_threshold, parallel_num, parallel_id, remote_edge_buffers, max_pending_edge_buffer_task, num_edge_buffer_task_returns)[源代码]#
add_key_value_pairs(pairs)[源代码]#
flush_key_value_pairs()[源代码]#
balanced_union_find()[源代码]#
distribute_edge(u, v)[源代码]#
set_edge_buffer()[源代码]#
edge_redistribution()[源代码]#
communication()[源代码]#
find(x)[源代码]#
union(x, y)[源代码]#
union_list(x_list)[源代码]#
rebalancing()[源代码]#
squeeze()[源代码]#
dup_idx(queries)[源代码]#
data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.get_remote_classes()[源代码]#

Get remote versions of classes with Ray decorators applied at runtime.

class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.RayBTSMinhashDeduplicator(tokenization: str = 'space', window_size: Annotated[int, Gt(gt=0)] = 5, lowercase: bool = True, ignore_pattern: str | None = None, num_permutations: Annotated[int, Gt(gt=0)] = 256, jaccard_threshold: Annotated[float, FieldInfo(annotation=NoneType, required=True, metadata=[Ge(ge=0), Le(le=1)])] = 0.7, num_bands: Annotated[int, Gt(gt=0)] | None = None, num_rows_per_band: Annotated[int, Gt(gt=0)] | None = None, tokenizer_model: str | None = None, union_find_parallel_num: int | str = 'auto', union_threshold: int | None = 256, max_pending_edge_buffer_task: int | None = 20, num_edge_buffer_task_returns: int | None = 10, max_pending_filter_tasks: int | None = 20, num_filter_task_returns: int | None = 10, merge_batch_size: int | None = 1000, *args, **kwargs)[源代码]#

基类:Deduplicator

A MinhashLSH deduplicator based on RAY.

EMPTY_HASH_VALUE = 'EMPTY'#
__init__(tokenization: str = 'space', window_size: Annotated[int, Gt(gt=0)] = 5, lowercase: bool = True, ignore_pattern: str | None = None, num_permutations: Annotated[int, Gt(gt=0)] = 256, jaccard_threshold: Annotated[float, FieldInfo(annotation=NoneType, required=True, metadata=[Ge(ge=0), Le(le=1)])] = 0.7, num_bands: Annotated[int, Gt(gt=0)] | None = None, num_rows_per_band: Annotated[int, Gt(gt=0)] | None = None, tokenizer_model: str | None = None, union_find_parallel_num: int | str = 'auto', union_threshold: int | None = 256, max_pending_edge_buffer_task: int | None = 20, num_edge_buffer_task_returns: int | None = 10, max_pending_filter_tasks: int | None = 20, num_filter_task_returns: int | None = 10, merge_batch_size: int | None = 1000, *args, **kwargs)[源代码]#

Initialization method.

参数:
  • tokenization -- tokenization method for sample texts. It should be one of [space, punctuation, character, sentencepiece]. For English-like languages, we recommend to use 'space', for Chinese-like languages, we recommend to use 'character', and for multiple languages, we recommend to use 'sentencepiece'. If using 'sentencepiece', please provided the model path in the 'tokenizer_model' field.

  • window_size -- window size of shingling

  • lowercase -- whether to convert text to lower case first

  • ignore_pattern -- whether to ignore sub-strings with specific pattern when computing minhash

  • num_permutations -- number of permutations in minhash computing

  • jaccard_threshold -- the min jaccard similarity threshold in near-duplicate detection. When the jaccard similarity of two sample texts is >= this threshold, they are regarded as similar samples and this op will only keep one of them after deduplication

  • num_bands -- number of bands in LSH. Default it's None, and it will be determined by an optimal params computation algorithm by minimize the weighted sum of probs of False Positives and False Negatives

  • num_rows_per_band -- number of rows in each band in LSH. Default it's None, and it will be determined by an optimal params computation algorithm

  • tokenizer_model -- path for the sentencepiece model, used for sentencepiece tokenization.

  • union_find_parallel_num -- number of parallel workers for union-find algorithm. Default it's 'auto', and it will be determined by half of the number of CPUs.

  • union_threshold -- threshold for minhash values group to perform union-find algorithm. Default it's 256.

  • max_pending_edge_buffer_task -- max number of pending edge buffer ray tasks. Default it's 20.

  • num_edge_buffer_task_returns -- number of edge buffer tasks for ray.wait to return. Default it's 10.

  • max_pending_filter_tasks -- max number of pending filter ray tasks. Default it's 20.

  • num_filter_task_returns -- number of filter tasks for ray.wait to return. Default it's 10.

  • merge_batch_size -- batch size for BTS operations. Default it's 1000.

  • tmp_file_name -- the temporary folder name for deduplication.

calc_minhash(text_list: Array, uid_list: List) Table[源代码]#
merge_op_batch(object_refs)[源代码]#
merge()[源代码]#
filter_with_union_find(samples: Table) Table[源代码]#
run(dataset, **kwargs)[源代码]#