import oci from datetime import datetime, timedelta from config_loader import load_config from oci.object_storage.models import CreatePreauthenticatedRequestDetails config = load_config() oci_config = oci.config.from_file( file_location="~/.oci/config", profile_name=config.bucket_profile ) object_storage = oci.object_storage.ObjectStorageClient(oci_config) def _namespace(): if config.oci_namespace != "auto": return config.oci_namespace return object_storage.get_namespace().data # ========================= # Upload file # ========================= def upload_file(local_path: str, object_name: str): with open(local_path, "rb") as f: object_storage.put_object( namespace_name=_namespace(), bucket_name=config.oci_bucket, object_name=object_name, put_object_body=f ) print(f"SUCCESS on Upload {object_name}") # ========================= # Pre-authenticated download URL # ========================= def generate_download_url(object_name: str, hours=24): expire = datetime.utcnow() + timedelta(hours=hours) details = CreatePreauthenticatedRequestDetails( name=f"job-{object_name}", access_type="ObjectRead", object_name=object_name, time_expires=expire ) response = object_storage.create_preauthenticated_request( namespace_name=_namespace(), bucket_name=config.oci_bucket, create_preauthenticated_request_details=details ) par = response.data download_link = ( f"https://objectstorage.{oci_config['region']}.oraclecloud.com{par.access_uri}" ) print("PAR CREATED OK") print(download_link) return download_link