data_juicer.ops.mapper.s3_upload_file_mapper module#
- class data_juicer.ops.mapper.s3_upload_file_mapper.S3UploadFileMapper(*args, **kwargs)[source]#
Bases:
MapperMapper to upload local files to S3 and update paths to S3 URLs.
This operator uploads files from local paths to S3 storage. It supports: - Uploading multiple files concurrently - Updating file paths in the dataset to S3 URLs - Optional deletion of local files after successful upload - Custom S3 endpoints (for S3-compatible services like MinIO) - Skipping already uploaded files (based on S3 key)
The operator processes nested lists of paths, maintaining the original structure in the output.
- __init__(upload_field: str = None, s3_bucket: str = None, s3_prefix: str = '', aws_access_key_id: str = None, aws_secret_access_key: str = None, aws_session_token: str = None, aws_region: str = None, endpoint_url: str = None, remove_local: bool = False, skip_existing: bool = True, max_concurrent: int = 10, *args, **kwargs)[source]#
Initialization method.
- Parameters:
upload_field โ The field name containing file paths to upload.
s3_bucket โ S3 bucket name to upload files to.
s3_prefix โ Prefix (folder path) in S3 bucket. E.g., โvideos/โ or โdata/videos/โ.
aws_access_key_id โ AWS access key ID for S3.
aws_secret_access_key โ AWS secret access key for S3.
aws_session_token โ AWS session token for S3 (optional).
aws_region โ AWS region for S3.
endpoint_url โ Custom S3 endpoint URL (for S3-compatible services).
remove_local โ Whether to delete local files after successful upload.
skip_existing โ Whether to skip uploading if file already exists in S3.
max_concurrent โ Maximum concurrent uploads.
args โ extra args
kwargs โ extra args
- property s3_client#
Lazy initialization of S3 client to avoid serialization issues with Ray.
- async upload_files_async(paths: List[str]) List[tuple][source]#
Upload multiple files asynchronously.
- Parameters:
paths โ List of local file paths
- Returns:
List of (idx, status, s3_url, error_message) tuples