data_juicer.core.executor.partition_size_optimizer module#

Partition Size Optimizer for DataJuicer

This module automatically configures optimal partition sizes based on: 1. Data modality (text, image, audio, video, multimodal) 2. Dataset characteristics (file sizes, complexity) 3. Available system resources (CPU, memory, GPU) 4. Processing pipeline complexity 5. Ray cluster configuration

class data_juicer.core.executor.partition_size_optimizer.ModalityType(value)[source]#

Bases: Enum

Supported data modalities.

TEXT = 'text'#
IMAGE = 'image'#
AUDIO = 'audio'#
VIDEO = 'video'#
MULTIMODAL = 'multimodal'#
class data_juicer.core.executor.partition_size_optimizer.LocalResources(cpu_cores: int, available_memory_gb: float, total_memory_gb: float, gpu_count: int, gpu_memory_gb: float | None = None, disk_space_gb: float | None = None)[source]#

Bases: object

Local system resources.

cpu_cores: int#
available_memory_gb: float#
total_memory_gb: float#
gpu_count: int#
gpu_memory_gb: float | None = None#
disk_space_gb: float | None = None#
__init__(cpu_cores: int, available_memory_gb: float, total_memory_gb: float, gpu_count: int, gpu_memory_gb: float | None = None, disk_space_gb: float | None = None) None#
class data_juicer.core.executor.partition_size_optimizer.ClusterResources(num_nodes: int, total_cpu_cores: int, total_memory_gb: float, available_cpu_cores: int, available_memory_gb: float, gpu_resources: Dict[str, int])[source]#

Bases: object

Ray cluster resources.

num_nodes: int#
total_cpu_cores: int#
total_memory_gb: float#
available_cpu_cores: int#
available_memory_gb: float#
gpu_resources: Dict[str, int]#
__init__(num_nodes: int, total_cpu_cores: int, total_memory_gb: float, available_cpu_cores: int, available_memory_gb: float, gpu_resources: Dict[str, int]) None#
class data_juicer.core.executor.partition_size_optimizer.DataCharacteristics(primary_modality: ModalityType, modality_distribution: Dict[ModalityType, int], avg_text_length: float, avg_images_per_sample: float, avg_audio_per_sample: float, avg_video_per_sample: float, total_samples: int, sample_size_analyzed: int, memory_per_sample_mb: float, processing_complexity_score: float, data_skew_factor: float)[source]#

Bases: object

Data characteristics from sampling.

primary_modality: ModalityType#
modality_distribution: Dict[ModalityType, int]#
avg_text_length: float#
avg_images_per_sample: float#
avg_audio_per_sample: float#
avg_video_per_sample: float#
total_samples: int#
sample_size_analyzed: int#
memory_per_sample_mb: float#
processing_complexity_score: float#
data_skew_factor: float#
__init__(primary_modality: ModalityType, modality_distribution: Dict[ModalityType, int], avg_text_length: float, avg_images_per_sample: float, avg_audio_per_sample: float, avg_video_per_sample: float, total_samples: int, sample_size_analyzed: int, memory_per_sample_mb: float, processing_complexity_score: float, data_skew_factor: float) None#
class data_juicer.core.executor.partition_size_optimizer.ModalityConfig(modality: ModalityType, default_partition_size: int, max_partition_size: int, max_partition_size_mb: int, memory_multiplier: float, complexity_multiplier: float, description: str)[source]#

Bases: object

Configuration for a specific modality.

modality: ModalityType#
default_partition_size: int#
max_partition_size: int#
max_partition_size_mb: int#
memory_multiplier: float#
complexity_multiplier: float#
description: str#
__init__(modality: ModalityType, default_partition_size: int, max_partition_size: int, max_partition_size_mb: int, memory_multiplier: float, complexity_multiplier: float, description: str) None#
class data_juicer.core.executor.partition_size_optimizer.ResourceDetector[source]#

Bases: object

Detect available system and cluster resources.

static detect_local_resources() LocalResources[source]#

Detect local system resources.

static detect_ray_cluster() ClusterResources | None[source]#

Detect Ray cluster resources.

static calculate_optimal_worker_count(local_resources: LocalResources, cluster_resources: ClusterResources | None = None, partition_size: int = None, total_samples: int = None) int[source]#

Calculate optimal number of Ray workers based on available resources.

Parameters:
  • local_resources – Local system resources

  • cluster_resources – Ray cluster resources (optional)

  • partition_size – Size of each partition (for workload estimation)

  • total_samples – Total number of samples (for workload estimation)

Returns:

Optimal number of workers

class data_juicer.core.executor.partition_size_optimizer.PartitionSizeOptimizer(cfg)[source]#

Bases: object

Automatically optimizes partition sizes based on data characteristics and available resources.

calculate_target_partition_mb(available_memory_gb: float) int[source]#

Calculate target partition size in MB based on available memory and config.

Uses config.partition.target_size_mb if available, otherwise falls back to dynamic sizing based on available memory (32MB - 256MB).

MODALITY_CONFIGS = {ModalityType.AUDIO: ModalityConfig(modality=<ModalityType.AUDIO: 'audio'>, default_partition_size=1000, max_partition_size=4000, max_partition_size_mb=256, memory_multiplier=8.0, complexity_multiplier=5.0, description='Audio data - high memory usage, target 256MB partitions (configurable)'), ModalityType.IMAGE: ModalityConfig(modality=<ModalityType.IMAGE: 'image'>, default_partition_size=2000, max_partition_size=10000, max_partition_size_mb=256, memory_multiplier=5.0, complexity_multiplier=3.0, description='Image data - moderate memory usage, target 256MB partitions (configurable)'), ModalityType.MULTIMODAL: ModalityConfig(modality=<ModalityType.MULTIMODAL: 'multimodal'>, default_partition_size=1600, max_partition_size=6000, max_partition_size_mb=256, memory_multiplier=10.0, complexity_multiplier=8.0, description='Multimodal data - combination of multiple modalities, target 256MB partitions (configurable)'), ModalityType.TEXT: ModalityConfig(modality=<ModalityType.TEXT: 'text'>, default_partition_size=10000, max_partition_size=50000, max_partition_size_mb=256, memory_multiplier=1.0, complexity_multiplier=1.0, description='Text data - efficient processing, low memory usage, target 256MB partitions (configurable)'), ModalityType.VIDEO: ModalityConfig(modality=<ModalityType.VIDEO: 'video'>, default_partition_size=400, max_partition_size=2000, max_partition_size_mb=256, memory_multiplier=20.0, complexity_multiplier=15.0, description='Video data - very high memory usage, target 256MB partitions (configurable)')}#
__init__(cfg)[source]#

Initialize the optimizer with configuration.

detect_modality(sample: Dict) ModalityType[source]#

Detect the primary modality of a sample.

analyze_dataset_characteristics(dataset) DataCharacteristics[source]#

Analyze dataset characteristics to inform partition sizing.

estimate_sample_size_mb(sample: Dict) float[source]#

Measure actual memory size of a sample in MB.

Uses deep size calculation to include all nested objects (strings, lists, etc.) rather than just the shallow dict overhead.

analyze_processing_complexity(process_pipeline: List) float[source]#

Analyze the complexity of the processing pipeline using linear scoring.

get_optimal_partition_size(dataset, process_pipeline: List) Tuple[int, int][source]#

Get optimal partition size and max size based on data characteristics and available resources.

calculate_resource_aware_partition_size(characteristics: DataCharacteristics, local_resources: LocalResources, cluster_resources: ClusterResources | None, complexity_multiplier: float) int[source]#

Calculate partition size based on data characteristics and available resources.

Primary goal: Target partition size based on config (default 256MB). Secondary goals: Ensure sufficient parallelism and respect resource constraints.

calculate_text_partition_size_simple(avg_text_length: float, complexity_score: float, target_memory_mb: float) int[source]#

Calculate text partition size targeting specified memory size.

calculate_optimal_max_size_mb(characteristics: DataCharacteristics, local_resources: LocalResources, cluster_resources: ClusterResources | None, complexity_multiplier: float) int[source]#

Calculate optimal max partition size in MB based on available memory.

get_partition_recommendations(dataset, process_pipeline: List) Dict[source]#

Get comprehensive partition recommendations.

data_juicer.core.executor.partition_size_optimizer.auto_configure_resources(cfg, dataset, process_pipeline: List) Dict[source]#

Analyze dataset and return resource configuration recommendations.

Does NOT mutate cfg - caller should apply recommendations as needed.

Parameters:
  • cfg – Configuration object (read-only)

  • dataset – Dataset to analyze

  • process_pipeline – List of processing operations

Returns:

Dict with recommended resource configuration