data_juicer.ops.mapper.s3_upload_file_mapper module#

class data_juicer.ops.mapper.s3_upload_file_mapper.S3UploadFileMapper(*args, **kwargs)[source]#

Bases: Mapper

Mapper to upload local files to S3 and update paths to S3 URLs.

This operator uploads files from local paths to S3 storage. It supports: - Uploading multiple files concurrently - Updating file paths in the dataset to S3 URLs - Optional deletion of local files after successful upload - Custom S3 endpoints (for S3-compatible services like MinIO) - Skipping already uploaded files (based on S3 key)

The operator processes nested lists of paths, maintaining the original structure in the output.

__init__(upload_field: str = None, s3_bucket: str = None, s3_prefix: str = '', aws_access_key_id: str = None, aws_secret_access_key: str = None, aws_session_token: str = None, aws_region: str = None, endpoint_url: str = None, remove_local: bool = False, skip_existing: bool = True, max_concurrent: int = 10, *args, **kwargs)[source]#

Initialization method.

Parameters:
  • upload_field โ€“ The field name containing file paths to upload.

  • s3_bucket โ€“ S3 bucket name to upload files to.

  • s3_prefix โ€“ Prefix (folder path) in S3 bucket. E.g., โ€˜videos/โ€™ or โ€˜data/videos/โ€™.

  • aws_access_key_id โ€“ AWS access key ID for S3.

  • aws_secret_access_key โ€“ AWS secret access key for S3.

  • aws_session_token โ€“ AWS session token for S3 (optional).

  • aws_region โ€“ AWS region for S3.

  • endpoint_url โ€“ Custom S3 endpoint URL (for S3-compatible services).

  • remove_local โ€“ Whether to delete local files after successful upload.

  • skip_existing โ€“ Whether to skip uploading if file already exists in S3.

  • max_concurrent โ€“ Maximum concurrent uploads.

  • args โ€“ extra args

  • kwargs โ€“ extra args

property s3_client#

Lazy initialization of S3 client to avoid serialization issues with Ray.

async upload_files_async(paths: List[str]) List[tuple][source]#

Upload multiple files asynchronously.

Parameters:

paths โ€“ List of local file paths

Returns:

List of (idx, status, s3_url, error_message) tuples

async upload_nested_paths(nested_paths: List[str | List[str]])[source]#

Upload nested paths with structure preservation.

Parameters:

nested_paths โ€“ Nested list of file paths

Returns:

(reconstructed_paths, failed_info)

process_batched(samples)[source]#

Process a batch of samples.