data_juicer.ops.mapper.s3_upload_file_mapper module#

class data_juicer.ops.mapper.s3_upload_file_mapper.S3UploadFileMapper(*args, **kwargs)[源代码]#

基类:Mapper

Mapper to upload local files to S3 and update paths to S3 URLs.

This operator uploads files from local paths to S3 storage. It supports: - Uploading multiple files concurrently - Updating file paths in the dataset to S3 URLs - Optional deletion of local files after successful upload - Custom S3 endpoints (for S3-compatible services like MinIO) - Skipping already uploaded files (based on S3 key)

The operator processes nested lists of paths, maintaining the original structure in the output.

__init__(upload_field: str = None, s3_bucket: str = None, s3_prefix: str = '', aws_access_key_id: str = None, aws_secret_access_key: str = None, aws_session_token: str = None, aws_region: str = None, endpoint_url: str = None, remove_local: bool = False, skip_existing: bool = True, max_concurrent: int = 10, *args, **kwargs)[源代码]#

Initialization method.

参数:
  • upload_field -- The field name containing file paths to upload.

  • s3_bucket -- S3 bucket name to upload files to.

  • s3_prefix -- Prefix (folder path) in S3 bucket. E.g., 'videos/' or 'data/videos/'.

  • aws_access_key_id -- AWS access key ID for S3.

  • aws_secret_access_key -- AWS secret access key for S3.

  • aws_session_token -- AWS session token for S3 (optional).

  • aws_region -- AWS region for S3.

  • endpoint_url -- Custom S3 endpoint URL (for S3-compatible services).

  • remove_local -- Whether to delete local files after successful upload.

  • skip_existing -- Whether to skip uploading if file already exists in S3.

  • max_concurrent -- Maximum concurrent uploads.

  • args -- extra args

  • kwargs -- extra args

property s3_client#

Lazy initialization of S3 client to avoid serialization issues with Ray.

async upload_files_async(paths: List[str]) List[tuple][源代码]#

Upload multiple files asynchronously.

参数:

paths -- List of local file paths

返回:

List of (idx, status, s3_url, error_message) tuples

async upload_nested_paths(nested_paths: List[str | List[str]])[源代码]#

Upload nested paths with structure preservation.

参数:

nested_paths -- Nested list of file paths

返回:

(reconstructed_paths, failed_info)

process_batched(samples)[源代码]#

Process a batch of samples.