Skip to content

Commit

Permalink
Added async process object (#338)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuxiaolong37 authored and huiguangjun committed May 29, 2023
1 parent 75dd942 commit 44bf973
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 3 deletions.
66 changes: 66 additions & 0 deletions examples/async_process_object.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import base64
import os
import time
import oss2

# Specify access information, such as AccessKeyId, AccessKeySecret, and Endpoint.
# You can obtain access information from evironment variables or replace sample values in the code, such as <your AccessKeyId> with actual values.
#
# For example, if your bucket is located in the China (Hangzhou) region, you can set Endpoint to one of the following values:
# http://oss-cn-hangzhou.aliyuncs.com
# https://oss-cn-hangzhou.aliyuncs.com


access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<yourAccessKeyId>')
access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<yourAccessKeySecret>')
bucket_name = os.getenv('OSS_TEST_BUCKET', '<yourBucketName>')
endpoint = os.getenv('OSS_TEST_ENDPOINT', '<yourEndpoint>')

key = 'test-video.mp4'
dest_key = 'dest_test-video'
video_path = 'your mp4 video path'

# Make sure that all parameters are correctly configured
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
assert '<' not in param, 'Please set parameters:' + param


# Create a bucket. You can use the bucket to call all object-related operations
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)

# Upload local video files
put_result = bucket.put_object_from_file(key, video_path)
print("put object result status: %s" % put_result.status)

try:
# Set process
process = "video/convert,f_mp4,vcodec_h265,s_1920x1080,vb_2000000,fps_30,acodec_aac,ab_100000,sn_1|sys/saveas,o_{0},b_{1}".format(
oss2.compat.to_string(base64.urlsafe_b64encode(oss2.compat.to_bytes(dest_key))).replace('=', ''),
oss2.compat.to_string(base64.urlsafe_b64encode(oss2.compat.to_bytes(bucket.bucket_name))).replace('=', ''))

# Call async_ process_ Object interface
result = bucket.async_process_object(key, process)
print("async process object result status: %s" % result.status)
print(result.request_id)
print("event_id: %s" % result.event_id)
print("async_request_id: %s" % result.async_request_id)
print("task_id: %s" % result.task_id)

# Sleep for a period of time, waiting for asynchronous video processing to complete
time.sleep(10)

# Check if the processed video exists
exists = bucket.object_exists(dest_key+".mp4")
print("is exists: %s" % exists)
except oss2.exceptions.OssError as e:
pass
finally:
# Delete video files and processed files
del_key = bucket.delete_object(key)
print("delete key result: %s" % del_key.status)
del_dest_key = bucket.delete_object(dest_key+".mp4")
print("delete dest key result: %s" % del_dest_key.status)




20 changes: 20 additions & 0 deletions oss2/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ class Bucket(_Base):
RESOURCE_GROUP = 'resourceGroup'
STYLE = 'style'
STYLE_NAME = 'styleName'
ASYNC_PROCESS = 'x-oss-async-process'


def __init__(self, auth, endpoint, bucket_name,
Expand Down Expand Up @@ -2776,6 +2777,25 @@ def delete_bucket_style(self, styleName):
logger.debug("delete bucket style done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return RequestResult(resp)

def async_process_object(self, key, process, headers=None):
"""异步处理多媒体接口。
:param str key: 处理的多媒体的对象名称
:param str process: 处理的字符串,例如"video/convert,f_mp4,vcodec_h265,s_1920x1080,vb_2000000,fps_30,acodec_aac,ab_100000,sn_1|sys/saveas,o_dGVzdC5qcGc,b_dGVzdA"
:param headers: HTTP头部
:type headers: 可以是dict,建议是oss2.CaseInsensitiveDict
"""

headers = http.CaseInsensitiveDict(headers)

logger.debug("Start to async process object, bucket: {0}, key: {1}, process: {2}".format(
self.bucket_name, to_string(key), process))
process_data = "%s=%s" % (Bucket.ASYNC_PROCESS, process)
resp = self.__do_object('POST', key, params={Bucket.ASYNC_PROCESS: ''}, headers=headers, data=process_data)
logger.debug("Async process object done, req_id: {0}, status_code: {1}".format(resp.request_id, resp.status))
return self._parse_result(resp, xml_utils.parse_async_process_object, AsyncProcessObject)

def __do_object(self, method, key, **kwargs):
return self._do(method, self.bucket_name, key, **kwargs)

Expand Down
2 changes: 1 addition & 1 deletion oss2/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class ProviderAuth(AuthBase):
'callback-var', 'worm', 'wormId', 'wormExtend', 'replication', 'replicationLocation',
'replicationProgress', 'transferAcceleration', 'cname', 'metaQuery',
'x-oss-ac-source-ip', 'x-oss-ac-subnet-mask', 'x-oss-ac-vpc-id', 'x-oss-ac-forward-allow',
'resourceGroup', 'style', 'styleName']
'resourceGroup', 'style', 'styleName', 'x-oss-async-process']
)

def _sign_request(self, req, bucket_name, key):
Expand Down
17 changes: 16 additions & 1 deletion oss2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2681,4 +2681,19 @@ class DescribeRegionsResult(RequestResult):

def __init__(self, resp):
super(DescribeRegionsResult, self).__init__(resp)
self.regions = []
self.regions = []


class AsyncProcessObject(RequestResult):
"""异步多媒体处理返回信息。
:param str event_id: 事件id。
:param str async_request_id: 请求id。
:param str task_id: 任务id。
"""

def __init__(self, resp):
super(AsyncProcessObject, self).__init__(resp)
self.event_id = None
self.async_request_id = None
self.task_id = None
10 changes: 9 additions & 1 deletion oss2/xml_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2021,4 +2021,12 @@ def parse_describe_regions(result, body):
tmp.internal_endpoint = _find_tag_with_default(region, 'InternalEndpoint', None)
tmp.accelerate_endpoint = _find_tag_with_default(region, 'AccelerateEndpoint', None)

result.regions.append(tmp)
result.regions.append(tmp)

def parse_async_process_object(result, body):
if body:
body_dict = eval(body.decode('utf-8'))
result.event_id = body_dict['EventId']
result.async_request_id = body_dict['RequestId']
result.task_id = body_dict['TaskId']
return result
23 changes: 23 additions & 0 deletions tests/test_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,29 @@ def test_list_objects(self):
result = bucket.list_objects()
self.assertTrue(result.object_list[0].restore_info.__contains__('ongoing-request="true"'))
self.assertTrue(result.object_list[1].restore_info.__contains__('ongoing-request="true"'))

def test_async_process_object(self):
try:
key = self.random_key(".jpg")
result = self.bucket.put_object_from_file(key, "tests/example.jpg")
self.assertEqual(result.status, 200)
dest_key = self.random_key(".jpg")

process = "image/resize,w_100|sys/saveas,o_{0},b_{1}".format(
oss2.compat.to_string(base64.urlsafe_b64encode(oss2.compat.to_bytes(dest_key))).replace('=', ''),
oss2.compat.to_string(base64.urlsafe_b64encode(oss2.compat.to_bytes(self.bucket.bucket_name))).replace('=', ''))

result = self.bucket.async_process_object(key, process)

# imm dont support process image in async mode
self.assertFalse(True, 'should not here')
except oss2.exceptions.OssError as e:
# expect Imm Client Error
self.assertEqual(e.code, 'Imm Client')
self.assertEqual(e.message, 'The specified resource Route is not found.')
except:
self.assertFalse(True, 'should not here')


class TestSign(TestObject):
"""
Expand Down
31 changes: 31 additions & 0 deletions unittests/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -2444,5 +2444,36 @@ def test_describe_regions(self, do_request):
self.assertEqual(result.regions[1].internal_endpoint, 'oss-cn-shanghai-internal.aliyuncs.com')
self.assertEqual(result.regions[1].accelerate_endpoint, 'oss-accelerate.aliyuncs.com')

@patch('oss2.Session.do_request')
def test_async_process_object(self, do_request):
request_text = '''POST /test-video.mp4?x-oss-async-process HTTP/1.1
Date: Fri , 30 Apr 2021 13:08:38 GMT
Content-Length:443
x-oss-async-process=video/convert,f_mp4,vcodec_h265,s_1920x1080,vb_2000000,fps_30,acodec_aac,ab_100000,sn_1|sys/saveas
Host: ming-oss-share.oss-cn-hangzhou.aliyuncs.com
Authorization: OSS qn6qrrqxo2oawuk53otf****:PYbzsdWAIWAlMW8luk****
'''

response_text = '''HTTP/1.1 200 OK
Server: AliyunOSS
Date: Sat, 12 Dec 2015 00:35:42 GMT
Content-Type: application/xml
Content-Length: 96
Connection: keep-alive
x-oss-request-id: 566B6BDD68248CE14F729DC0
x-oss-async-process=video/convert,f_mp4,vcodec_h265,s_1920x1080,vb_2000000,fps_30,acodec_aac,ab_100000,sn_1|sys/saveas
{"EventId":"3D7-1XxFtV2t3VtcOn2CXqI2ldsMN3i","RequestId":"8DF65942-D483-5E7E-BC1A-B25C617A9C32","TaskId":"MediaConvert-d2280366-cd33-48f7-90c6-a0dab65bed63"}
'''
req_info = mock_response(do_request, response_text)
key = "test-video.mp4"
result = bucket().async_process_object(key, 'video/convert,f_mp4,vcodec_h265,s_1920x1080,vb_2000000,fps_30,acodec_aac,ab_100000,sn_1|sys/saveas')

self.assertEqual(result.request_id, '566B6BDD68248CE14F729DC0')
self.assertEqual(result.async_request_id, '8DF65942-D483-5E7E-BC1A-B25C617A9C32')
self.assertEqual(result.event_id, '3D7-1XxFtV2t3VtcOn2CXqI2ldsMN3i')
self.assertEqual(result.task_id, 'MediaConvert-d2280366-cd33-48f7-90c6-a0dab65bed63')


if __name__ == '__main__':
unittest.main()

0 comments on commit 44bf973

Please sign in to comment.