Source code for data_juicer.utils.s3_utils

"""
S3 utilities for Data-Juicer.

Provides unified S3 authentication and filesystem creation for both
s3fs (default executor) and PyArrow (Ray executor) backends.
"""

import os
from typing import Dict, Tuple

import pyarrow.fs
from loguru import logger

# Try to load .env file if python-dotenv is available
try:
    from dotenv import load_dotenv

    # Load environment variables from .env file if it exists
    load_dotenv()  # By default, override=False, so environment variables take precedence
except ImportError:
    # python-dotenv not installed, .env files won't be automatically loaded
    pass


[docs] def get_aws_credentials(ds_config: Dict = {}) -> Tuple[str, str, str, str]: """ Get AWS credentials with priority order: 1. Environment variables (e.g., AWS_ACCESS_KEY_ID) 2. Explicit config parameters (e.g., in a dataset config dict) Args: ds_config: Dataset configuration dictionary containing optional AWS credentials. If not provided, an empty dict is used. Returns: Tuple of (access_key_id, secret_access_key, session_token, region) """ # Try environment variables first (most secure) aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID") aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY") aws_session_token = os.environ.get("AWS_SESSION_TOKEN") aws_region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION") # Fall back to config if not in environment if not aws_access_key_id and "aws_access_key_id" in ds_config: aws_access_key_id = ds_config["aws_access_key_id"] logger.warning( "AWS credentials found in config file. For better security, " "consider using environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)" ) if not aws_secret_access_key and "aws_secret_access_key" in ds_config: aws_secret_access_key = ds_config["aws_secret_access_key"] if not aws_session_token and "aws_session_token" in ds_config: aws_session_token = ds_config["aws_session_token"] if not aws_region and "aws_region" in ds_config: aws_region = ds_config["aws_region"] return aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region
[docs] def create_pyarrow_s3_filesystem(ds_config: Dict = {}) -> "pyarrow.fs.S3FileSystem": """ Create a PyArrow S3FileSystem with proper authentication. Authentication priority: 1. Environment variables (most secure, recommended for production) 2. Explicit config parameters (for development/testing) 3. Default AWS credential chain (boto3-style: env vars, ~/.aws/credentials, IAM roles) Args: ds_config: Dataset configuration dictionary containing optional AWS credentials Returns: pyarrow.fs.S3FileSystem instance configured with credentials """ # Get credentials with priority order aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region = get_aws_credentials(ds_config) s3_options = {} # Set credentials if provided if aws_access_key_id: s3_options["access_key"] = aws_access_key_id if aws_secret_access_key: s3_options["secret_key"] = aws_secret_access_key if aws_session_token: s3_options["session_token"] = aws_session_token if aws_region: s3_options["region"] = aws_region or "us-east-2" if "endpoint_url" in ds_config: s3_options["endpoint_override"] = ds_config["endpoint_url"] # Create S3 filesystem # If no explicit credentials, PyArrow will use default AWS credential chain if s3_options: s3_fs = pyarrow.fs.S3FileSystem(**s3_options) logger.info("Using explicit AWS credentials for S3 access") else: s3_fs = pyarrow.fs.S3FileSystem() logger.info("Using default AWS credential chain for S3 access") return s3_fs
[docs] def validate_s3_path(path: str) -> None: """ Validate that a path is a valid S3 path. Args: path: Path to validate Raises: ValueError: If path doesn't start with 's3://' """ if not path.startswith("s3://"): raise ValueError(f"S3 path must start with 's3://', got: {path}")