Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added async process object #338

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions examples/async_process_object.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import base64
import os
import time
import oss2

# Specify access information, such as AccessKeyId, AccessKeySecret, and Endpoint.
# You can obtain access information from evironment variables or replace sample values in the code, such as <your AccessKeyId> with actual values.
#
# For example, if your bucket is located in the China (Hangzhou) region, you can set Endpoint to one of the following values:
# http://oss-cn-hangzhou.aliyuncs.com
# https://oss-cn-hangzhou.aliyuncs.com


access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<yourAccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<yourAccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<yourBucketName>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<yourEndpoint>')

key = 'test-video.mp4'
dest_key = 'out-python-test-video.mp4'
pyth = 'D:\\zxl\\linux\\test-video.mp4'

# Make sure that all parameters are correctly configured
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, 'Please set parameters:' + param


# Create a bucket. You can use the bucket to call all object-related operations
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)

# Upload local video files
put_result = bucket.put_object_from_file(key, pyth)
print("put object result status: %s" % put_result.status)

try:
# Set process
process = "video/convert,f_mp4,vcodec_h265,s_1920x1080,vb_2000000,fps_30,acodec_aac,ab_100000,sn_1|sys/saveas,o_{0},b_{1}".format(
oss2.compat.to_string(base64.urlsafe_b64encode(oss2.compat.to_bytes(dest_key))).replace('=', ''),
oss2.compat.to_string(base64.urlsafe_b64encode(oss2.compat.to_bytes(bucket.bucket_name))).replace('=', ''))

# Call async_ process_ Object interface
result = bucket.async_process_object(key, process)
print("async process object result status: %s" % result.status)
print(result.request_id)
print("event_id: %s" % result.event_id)
print("async_request_id: %s" % result.async_request_id)
print("task_id: %s" % result.task_id)

# Sleep for a period of time, waiting for asynchronous video processing to complete
time.sleep(10)

# Check if the processed video exists
exists = bucket.object_exists(dest_key+".mp4")
print("is exists: %s" % exists)
except oss2.exceptions.OssError as e:
pass
finally:
# Delete video files and processed files
del_key = bucket.delete_object(key)
print("delete key result: %s" % del_key.status)
del_dest_key = bucket.delete_object(dest_key+".mp4")
print("delete dest key result: %s" % del_dest_key.status)





20 changes: 20 additions & 0 deletions oss2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ class Bucket(_Base):
RESOURCE_GROUP = 'resourceGroup'
STYLE = 'style'
STYLE_NAME = 'styleName'
ASYNC_PROCESS = 'x-oss-async-process'


def __init__(self, auth, endpoint, bucket_name,
Expand Down Expand Up @@ -2776,6 +2777,25 @@ def delete_bucket_style(self, styleName):
logger.debug("delete bucket style done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return RequestResult(resp)

def async_process_object(self, key, process, headers=None):
"""异步处理多媒体接口。

:param str key: 处理的多媒体的对象名称
:param str process: 处理的字符串,例如"video/convert,f_mp4,vcodec_h265,s_1920x1080,vb_2000000,fps_30,acodec_aac,ab_100000,sn_1|sys/saveas,o_dGVzdC5qcGc,b_dGVzdA"

:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
"""

headers = http.CaseInsensitiveDict(headers)

logger.debug("Start to async process object, bucket: {0}, key: {1}, process: {2}".format(
self.bucket_name, to_string(key), process))
process_data = "%s=%s" % (Bucket.ASYNC_PROCESS, process)
resp = self.__do_object('POST', key, params={Bucket.ASYNC_PROCESS: ''}, headers=headers, data=process_data)
logger.debug("Async process object done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return self._parse_result(resp, xml_utils.parse_async_process_object, AsyncProcessObject)

def __do_object(self, method, key, **kwargs):
return self._do(method, self.bucket_name, key, **kwargs)

Expand Down
2 changes: 1 addition & 1 deletion oss2/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class ProviderAuth(AuthBase):
'callback-var', 'worm', 'wormId', 'wormExtend', 'replication', 'replicationLocation',
'replicationProgress', 'transferAcceleration', 'cname', 'metaQuery',
'x-oss-ac-source-ip', 'x-oss-ac-subnet-mask', 'x-oss-ac-vpc-id', 'x-oss-ac-forward-allow',
'resourceGroup', 'style', 'styleName']
'resourceGroup', 'style', 'styleName', 'x-oss-async-process']
)

def _sign_request(self, req, bucket_name, key):
Expand Down
17 changes: 16 additions & 1 deletion oss2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2681,4 +2681,19 @@ class DescribeRegionsResult(RequestResult):

def __init__(self, resp):
super(DescribeRegionsResult, self).__init__(resp)
self.regions = []
self.regions = []


class AsyncProcessObject(RequestResult):
"""异步多媒体处理返回信息。

:param str event_id: 事件id。
:param str async_request_id: 请求id。
:param str task_id: 任务id。
"""

def __init__(self, resp):
super(AsyncProcessObject, self).__init__(resp)
self.event_id = None
self.async_request_id = None
self.task_id = None
10 changes: 9 additions & 1 deletion oss2/xml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2021,4 +2021,12 @@ def parse_describe_regions(result, body):
tmp.internal_endpoint = _find_tag_with_default(region, 'InternalEndpoint', None)
tmp.accelerate_endpoint = _find_tag_with_default(region, 'AccelerateEndpoint', None)

result.regions.append(tmp)
result.regions.append(tmp)

def parse_async_process_object(result, body):
if body:
body_dict = eval(body.decode('utf-8'))
result.event_id = body_dict['EventId']
result.async_request_id = body_dict['RequestId']
result.task_id = body_dict['TaskId']
return result
Binary file added tests/test-video.mp4
Binary file not shown.
55 changes: 55 additions & 0 deletions tests/test_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,61 @@ def test_list_objects(self):
result = bucket.list_objects()
self.assertTrue(result.object_list[0].restore_info.__contains__('ongoing-request="true"'))
self.assertTrue(result.object_list[1].restore_info.__contains__('ongoing-request="true"'))

def test_async_process_object(self):
auth = oss2.Auth(OSS_ID, OSS_SECRET)
bucket_name = self.OSS_BUCKET + "-async-process-object"
bucket = oss2.Bucket(auth, 'oss-cn-hangzhou.aliyuncs.com', bucket_name)

try:
# 创建bucket
bucket.create_bucket()

key = self.random_key(".mp4")
result = bucket.put_object_from_file(key, "tests/test-video.mp4")
self.assertEqual(result.status, 200)
# 设置文件处理后的名称
dest_key = "out-python-"+self.random_key()
# 设置process
process = "video/convert,f_mp4,vcodec_h265,s_1920x1080,vb_2000000,fps_30,acodec_aac,ab_100000,sn_1|sys/saveas,o_{0},b_{1}".format(
oss2.compat.to_string(base64.urlsafe_b64encode(oss2.compat.to_bytes(dest_key))).replace('=', ''),
oss2.compat.to_string(base64.urlsafe_b64encode(oss2.compat.to_bytes(bucket.bucket_name))).replace('=', ''))

# 调用async_process_object
result = bucket.async_process_object(key, process)

# 如果开启了imm,并且创建了一个imm的project,然后绑定了project和bucket,既开始进入如下正常测试流程
self.assertEqual(result.status, 200)
self.assertIsNotNone(result.event_id)
print(result.event_id)
self.assertIsNotNone(result.async_request_id)
print(result.async_request_id)
self.assertIsNotNone(result.task_id)
print(result.task_id)
# 获取文件大小
file_size = os.path.getsize("tests/test-video.mp4")
# 睡眠一段时间,等待异步视频处理完成。先根据视频大小/1m,然后再加5秒
time.sleep((file_size / (1000 * 1000)) + 60)

# 测试处理后的视频是否存在
result = bucket.object_exists(dest_key+".mp4")
self.assertEqual(result, True)

# 删除视频文件和处理后的文件
del_key = bucket.delete_object(key)
self.assertEqual(del_key.status, 204)
del_dest_key = bucket.delete_object(dest_key+".mp4")
self.assertEqual(del_dest_key.status, 204)
except oss2.exceptions.OssError as e:
# 如果没有开启imm,异步流程,暂时报如下错误
self.assertEqual(e.message, 'operation not support post: video/convert')
finally:
# 先删除文件,再删除bucket
del_key = bucket.delete_object(key)
self.assertEqual(del_key.status, 204)
# del_bucket = bucket.delete_bucket()
# self.assertEqual(del_bucket.status, 204)


class TestSign(TestObject):
"""
Expand Down
31 changes: 31 additions & 0 deletions unittests/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -2444,5 +2444,36 @@ def test_describe_regions(self, do_request):
self.assertEqual(result.regions[1].internal_endpoint, 'oss-cn-shanghai-internal.aliyuncs.com')
self.assertEqual(result.regions[1].accelerate_endpoint, 'oss-accelerate.aliyuncs.com')

@patch('oss2.Session.do_request')
def test_async_process_object(self, do_request):
request_text = '''POST /test-video.mp4?x-oss-async-process HTTP/1.1
Date: Fri , 30 Apr 2021 13:08:38 GMT
Content-Length:443
x-oss-async-process=video/convert,f_mp4,vcodec_h265,s_1920x1080,vb_2000000,fps_30,acodec_aac,ab_100000,sn_1|sys/saveas
Host: ming-oss-share.oss-cn-hangzhou.aliyuncs.com
Authorization: OSS qn6qrrqxo2oawuk53otf****:PYbzsdWAIWAlMW8luk****
'''

response_text = '''HTTP/1.1 200 OK
Server: AliyunOSS
Date: Sat, 12 Dec 2015 00:35:42 GMT
Content-Type: application/xml
Content-Length: 96
Connection: keep-alive
x-oss-request-id: 566B6BDD68248CE14F729DC0
x-oss-async-process=video/convert,f_mp4,vcodec_h265,s_1920x1080,vb_2000000,fps_30,acodec_aac,ab_100000,sn_1|sys/saveas

{"EventId":"3D7-1XxFtV2t3VtcOn2CXqI2ldsMN3i","RequestId":"8DF65942-D483-5E7E-BC1A-B25C617A9C32","TaskId":"MediaConvert-d2280366-cd33-48f7-90c6-a0dab65bed63"}
'''
req_info = mock_response(do_request, response_text)
key = "test-video.mp4"
result = bucket().async_process_object(key, 'video/convert,f_mp4,vcodec_h265,s_1920x1080,vb_2000000,fps_30,acodec_aac,ab_100000,sn_1|sys/saveas')

self.assertEqual(result.request_id, '566B6BDD68248CE14F729DC0')
self.assertEqual(result.async_request_id, '8DF65942-D483-5E7E-BC1A-B25C617A9C32')
self.assertEqual(result.event_id, '3D7-1XxFtV2t3VtcOn2CXqI2ldsMN3i')
self.assertEqual(result.task_id, 'MediaConvert-d2280366-cd33-48f7-90c6-a0dab65bed63')


if __name__ == '__main__':
unittest.main()