如何在 Python 中使用 Boto3 库通过 AWS 资源上传对象到 S3?
问题陈述 − 使用 Python 中的 Boto3 库将对象上传到 S3。例如,如何将 test.zip 上传到 S3 的 Bucket_1。
解决此问题的方法/算法
步骤 1 − 导入 boto3 和 botocore 异常来处理异常。
步骤 2 − 从 pathlib 导入 PurePosixPath 来从路径中检索文件名。
步骤 3 − s3_path 和 filepath 是函数 upload_object_into_s3 中的两个参数。
步骤 4 − 验证 s3_path 是否以 AWS 格式传递,例如 s3://bucket_name/key,以及 filepath 是否为本地路径,例如 C://users/filename。
步骤 5 − 使用 boto3 库创建一个 AWS 会话。
步骤 6 − 为 S3 创建一个 AWS 资源。
步骤 7 − 分割 S3 路径并执行操作以分离根存储桶名称和密钥路径。
步骤 8 − 获取完整文件路径的文件名并添加到 S3 密钥路径中。
步骤 9 − 现在使用函数 upload_fileobj 将本地文件上传到 S3。
步骤 10 − 使用函数 wait_until_exists 等待操作完成。
步骤 11 − 根据响应代码处理异常,以验证文件是否已上传。
步骤 12 − 如果在上传文件时出现问题,则处理通用异常。
示例
使用以下代码将文件上传到 AWS S3 −
import boto3
from botocore.exceptions import ClientError
from pathlib import PurePosixPath
def upload_object_into_s3(s3_path, filepath):
if 's3://' in filepath:
print('SourcePath is not a valid path.' + filepath)
raise Exception('SourcePath is not a valid path.')
elif s3_path.find('s3://') == -1:
print('DestinationPath is not a s3 path.' + s3_path)
raise Exception('DestinationPath is not a valid path.')
session = boto3.session.Session()
s3_resource = session.resource('s3')
tokens = s3_path.split('/')
target_key = ""
if len(tokens) > 3:
for tokn in range(3, len(tokens)):
if tokn == 3:
target_key += tokens[tokn]
else:
target_key += "/" + tokens[tokn]
target_bucket_name = tokens[2]
file_name = PurePosixPath(filepath).name
if target_key != '':
target_key.strip()
key_path = target_key + "/" + file_name
else:
key_path = file_name
print(("key_path: " + key_path, 'target_bucket: ' + target_bucket_name))
try:
# uploading Entity from local path
with open(filepath, "rb") as file:
s3_resource.meta.client.upload_fileobj(file, target_bucket_name, key_path)
try:
s3_resource.Object(target_bucket_name, key_path).wait_until_exists()
file.close()
except ClientError as error:
error_code = int(error.response['Error']['Code'])
if error_code == 412 or error_code == 304:
print("Object didn't Upload Successfully ", target_bucket_name)
raise error
return "Object Uploaded Successfully"
except Exception as error:
print("Error in upload object function of s3 helper: " + error.__str__())
raise error
print(upload_object_into_s3('s3://Bucket_1/testfolder', 'c://test.zip'))输出
key_path:/testfolder/test.zip, target_bucket: Bucket_1 Object Uploaded Successfully
广告
数据结构
网络
关系数据库管理系统 (RDBMS)
操作系统
Java
iOS
HTML
CSS
Android
Python
C 编程
C++
C#
MongoDB
MySQL
Javascript
PHP