data_juicer.core.executor.partition_size_optimizer module#
Partition Size Optimizer for DataJuicer
This module automatically configures optimal partition sizes based on: 1. Data modality (text, image, audio, video, multimodal) 2. Dataset characteristics (file sizes, complexity) 3. Available system resources (CPU, memory, GPU) 4. Processing pipeline complexity 5. Ray cluster configuration
- class data_juicer.core.executor.partition_size_optimizer.ModalityType(value)[source]#
Bases:
EnumSupported data modalities.
- TEXT = 'text'#
- IMAGE = 'image'#
- AUDIO = 'audio'#
- VIDEO = 'video'#
- MULTIMODAL = 'multimodal'#
- class data_juicer.core.executor.partition_size_optimizer.LocalResources(cpu_cores: int, available_memory_gb: float, total_memory_gb: float, gpu_count: int, gpu_memory_gb: float | None = None, disk_space_gb: float | None = None)[source]#
Bases:
objectLocal system resources.
- cpu_cores: int#
- available_memory_gb: float#
- total_memory_gb: float#
- gpu_count: int#
- gpu_memory_gb: float | None = None#
- disk_space_gb: float | None = None#
- __init__(cpu_cores: int, available_memory_gb: float, total_memory_gb: float, gpu_count: int, gpu_memory_gb: float | None = None, disk_space_gb: float | None = None) None#
- class data_juicer.core.executor.partition_size_optimizer.ClusterResources(num_nodes: int, total_cpu_cores: int, total_memory_gb: float, available_cpu_cores: int, available_memory_gb: float, gpu_resources: Dict[str, int])[source]#
Bases:
objectRay cluster resources.
- num_nodes: int#
- total_cpu_cores: int#
- total_memory_gb: float#
- available_cpu_cores: int#
- available_memory_gb: float#
- gpu_resources: Dict[str, int]#
- __init__(num_nodes: int, total_cpu_cores: int, total_memory_gb: float, available_cpu_cores: int, available_memory_gb: float, gpu_resources: Dict[str, int]) None#
- class data_juicer.core.executor.partition_size_optimizer.DataCharacteristics(primary_modality: ModalityType, modality_distribution: Dict[ModalityType, int], avg_text_length: float, avg_images_per_sample: float, avg_audio_per_sample: float, avg_video_per_sample: float, total_samples: int, sample_size_analyzed: int, memory_per_sample_mb: float, processing_complexity_score: float, data_skew_factor: float)[source]#
Bases:
objectData characteristics from sampling.
- primary_modality: ModalityType#
- modality_distribution: Dict[ModalityType, int]#
- avg_text_length: float#
- avg_images_per_sample: float#
- avg_audio_per_sample: float#
- avg_video_per_sample: float#
- total_samples: int#
- sample_size_analyzed: int#
- memory_per_sample_mb: float#
- processing_complexity_score: float#
- data_skew_factor: float#
- __init__(primary_modality: ModalityType, modality_distribution: Dict[ModalityType, int], avg_text_length: float, avg_images_per_sample: float, avg_audio_per_sample: float, avg_video_per_sample: float, total_samples: int, sample_size_analyzed: int, memory_per_sample_mb: float, processing_complexity_score: float, data_skew_factor: float) None#
- class data_juicer.core.executor.partition_size_optimizer.ModalityConfig(modality: ModalityType, default_partition_size: int, max_partition_size: int, max_partition_size_mb: int, memory_multiplier: float, complexity_multiplier: float, description: str)[source]#
Bases:
objectConfiguration for a specific modality.
- modality: ModalityType#
- default_partition_size: int#
- max_partition_size: int#
- max_partition_size_mb: int#
- memory_multiplier: float#
- complexity_multiplier: float#
- description: str#
- __init__(modality: ModalityType, default_partition_size: int, max_partition_size: int, max_partition_size_mb: int, memory_multiplier: float, complexity_multiplier: float, description: str) None#
- class data_juicer.core.executor.partition_size_optimizer.ResourceDetector[source]#
Bases:
objectDetect available system and cluster resources.
- static detect_local_resources() LocalResources[source]#
Detect local system resources.
- static detect_ray_cluster() ClusterResources | None[source]#
Detect Ray cluster resources.
- static calculate_optimal_worker_count(local_resources: LocalResources, cluster_resources: ClusterResources | None = None, partition_size: int = None, total_samples: int = None) int[source]#
Calculate optimal number of Ray workers based on available resources.
- Parameters:
local_resources â Local system resources
cluster_resources â Ray cluster resources (optional)
partition_size â Size of each partition (for workload estimation)
total_samples â Total number of samples (for workload estimation)
- Returns:
Optimal number of workers
- class data_juicer.core.executor.partition_size_optimizer.PartitionSizeOptimizer(cfg)[source]#
Bases:
objectAutomatically optimizes partition sizes based on data characteristics and available resources.
- calculate_target_partition_mb(available_memory_gb: float) int[source]#
Calculate target partition size in MB based on available memory and config.
Uses config.partition.target_size_mb if available, otherwise falls back to dynamic sizing based on available memory (32MB - 256MB).
- MODALITY_CONFIGS = {ModalityType.AUDIO: ModalityConfig(modality=<ModalityType.AUDIO: 'audio'>, default_partition_size=1000, max_partition_size=4000, max_partition_size_mb=256, memory_multiplier=8.0, complexity_multiplier=5.0, description='Audio data - high memory usage, target 256MB partitions (configurable)'), ModalityType.IMAGE: ModalityConfig(modality=<ModalityType.IMAGE: 'image'>, default_partition_size=2000, max_partition_size=10000, max_partition_size_mb=256, memory_multiplier=5.0, complexity_multiplier=3.0, description='Image data - moderate memory usage, target 256MB partitions (configurable)'), ModalityType.MULTIMODAL: ModalityConfig(modality=<ModalityType.MULTIMODAL: 'multimodal'>, default_partition_size=1600, max_partition_size=6000, max_partition_size_mb=256, memory_multiplier=10.0, complexity_multiplier=8.0, description='Multimodal data - combination of multiple modalities, target 256MB partitions (configurable)'), ModalityType.TEXT: ModalityConfig(modality=<ModalityType.TEXT: 'text'>, default_partition_size=10000, max_partition_size=50000, max_partition_size_mb=256, memory_multiplier=1.0, complexity_multiplier=1.0, description='Text data - efficient processing, low memory usage, target 256MB partitions (configurable)'), ModalityType.VIDEO: ModalityConfig(modality=<ModalityType.VIDEO: 'video'>, default_partition_size=400, max_partition_size=2000, max_partition_size_mb=256, memory_multiplier=20.0, complexity_multiplier=15.0, description='Video data - very high memory usage, target 256MB partitions (configurable)')}#
- detect_modality(sample: Dict) ModalityType[source]#
Detect the primary modality of a sample.
- analyze_dataset_characteristics(dataset) DataCharacteristics[source]#
Analyze dataset characteristics to inform partition sizing.
- estimate_sample_size_mb(sample: Dict) float[source]#
Measure actual memory size of a sample in MB.
Uses deep size calculation to include all nested objects (strings, lists, etc.) rather than just the shallow dict overhead.
- analyze_processing_complexity(process_pipeline: List) float[source]#
Analyze the complexity of the processing pipeline using linear scoring.
- get_optimal_partition_size(dataset, process_pipeline: List) Tuple[int, int][source]#
Get optimal partition size and max size based on data characteristics and available resources.
- calculate_resource_aware_partition_size(characteristics: DataCharacteristics, local_resources: LocalResources, cluster_resources: ClusterResources | None, complexity_multiplier: float) int[source]#
Calculate partition size based on data characteristics and available resources.
Primary goal: Target partition size based on config (default 256MB). Secondary goals: Ensure sufficient parallelism and respect resource constraints.
- calculate_text_partition_size_simple(avg_text_length: float, complexity_score: float, target_memory_mb: float) int[source]#
Calculate text partition size targeting specified memory size.
- calculate_optimal_max_size_mb(characteristics: DataCharacteristics, local_resources: LocalResources, cluster_resources: ClusterResources | None, complexity_multiplier: float) int[source]#
Calculate optimal max partition size in MB based on available memory.
- data_juicer.core.executor.partition_size_optimizer.auto_configure_resources(cfg, dataset, process_pipeline: List) Dict[source]#
Analyze dataset and return resource configuration recommendations.
Does NOT mutate cfg - caller should apply recommendations as needed.
- Parameters:
cfg â Configuration object (read-only)
dataset â Dataset to analyze
process_pipeline â List of processing operations
- Returns:
Dict with recommended resource configuration