Skip to content

Commit

Permalink
add Qiniu auth verify callback
Browse files Browse the repository at this point in the history
  • Loading branch information
lihsai0 committed Aug 28, 2024
1 parent 02d2e2e commit 0dbde38
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 186 deletions.
108 changes: 93 additions & 15 deletions qiniu/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,22 +195,56 @@ def __upload_token(self, policy):
return self.token_with_data(data)

def verify_callback(
self,
origin_authorization,
url,
body,
content_type='application/x-www-form-urlencoded'):
"""回调验证
Args:
origin_authorization: 回调时请求Header中的Authorization字段
url: 回调请求的url
body: 回调请求的body
content_type: 回调请求body的Content-Type
Returns:
返回true表示验证成功,返回false表示验证失败
self,
origin_authorization,
url,
body,
content_type='application/x-www-form-urlencoded',
method='GET',
headers=None
):
"""
Qbox 回调验证
Parameters
----------
origin_authorization: str
回调时请求 Header 中的 Authorization 字段
url: str
回调请求的 url
body: str
回调请求的 body
content_type: str
回调请求的 Content-Type
method: str
回调请求的 method,Qiniu 签名必须传入,默认 GET
headers: dict
回调请求的 headers,Qiniu 签名必须传入,默认为空字典
Returns
-------
bool
返回 True 表示验证成功,返回 False 表示验证失败
"""
if headers is None:
headers = {}

# 兼容 Qiniu 签名
if origin_authorization.startswith("Qiniu"):
qn_auth = QiniuMacAuth(
access_key=self.__access_key,
secret_key=self.__secret_key,
disable_qiniu_timestamp_signature=True
)
return qn_auth.verify_callback(
origin_authorization,
url=url,
body=body,
content_type=content_type,
method=method,
headers=headers
)

token = self.token_of_request(url, body, content_type)
authorization = 'QBox {0}'.format(token)
return origin_authorization == authorization
Expand Down Expand Up @@ -327,6 +361,50 @@ def qiniu_headers(self, headers):
'%s: %s' % (canonical_mime_header_key(key), headers.get(key)) for key in sorted(qiniu_fields)
])

def verify_callback(
self,
origin_authorization,
url,
body,
content_type='application/x-www-form-urlencoded',
method='GET',
headers=None
):
"""
Qiniu 回调验证
Parameters
----------
origin_authorization: str
回调时请求 Header 中的 Authorization 字段
url: str
回调请求的 url
body: str
回调请求的 body
content_type: str
回调请求的 Content-Type
method: str
回调请求的 Method
headers: dict
回调请求的 headers
Returns
-------
"""
if headers is None:
headers = {}
token = self.token_of_request(
method=method,
host=headers.get('Host', None),
url=url,
qheaders=self.qiniu_headers(headers),
content_type=content_type,
body=body
)
authorization = 'Qiniu {0}'.format(token)
return origin_authorization == authorization

@staticmethod
def __checkKey(access_key, secret_key):
if not (access_key and secret_key):
Expand Down
171 changes: 0 additions & 171 deletions test_qiniu.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@
hostscache_dir = None


dummy_access_key = 'abcdefghklmnopq'
dummy_secret_key = '1234567890'
dummy_auth = Auth(dummy_access_key, dummy_secret_key)


def rand_string(length):
lib = string.ascii_uppercase
return ''.join([random.choice(lib) for i in range(0, length)])
Expand Down Expand Up @@ -193,172 +188,6 @@ def test_decode_entry(self):
assert key == c.get('expect').get('key'), c.get('msg')


class AuthTestCase(unittest.TestCase):
def test_token(self):
token = dummy_auth.token('test')
assert token == 'abcdefghklmnopq:mSNBTR7uS2crJsyFr2Amwv1LaYg='

def test_token_with_data(self):
token = dummy_auth.token_with_data('test')
assert token == 'abcdefghklmnopq:-jP8eEV9v48MkYiBGs81aDxl60E=:dGVzdA=='

def test_noKey(self):
with pytest.raises(ValueError):
Auth(None, None).token('nokey')
with pytest.raises(ValueError):
Auth('', '').token('nokey')

def test_token_of_request(self):
token = dummy_auth.token_of_request('https://www.qiniu.com?go=1', 'test', '')
assert token == 'abcdefghklmnopq:cFyRVoWrE3IugPIMP5YJFTO-O-Y='
token = dummy_auth.token_of_request('https://www.qiniu.com?go=1', 'test', 'application/x-www-form-urlencoded')
assert token == 'abcdefghklmnopq:svWRNcacOE-YMsc70nuIYdaa1e4='

def test_QiniuMacRequestsAuth(self):
auth = QiniuMacAuth("ak", "sk")
test_cases = [
{
"method": "GET",
"host": None,
"url": "",
"qheaders": {
"X-Qiniu-": "a",
"X-Qiniu": "b",
"Content-Type": "application/x-www-form-urlencoded",
},
"content_type": "application/x-www-form-urlencoded",
"body": "{\"name\": \"test\"}",
"except_sign_token": "ak:0i1vKClRDWFyNkcTFzwcE7PzX74=",
},
{
"method": "GET",
"host": None,
"url": "",
"qheaders": {
"Content-Type": "application/json",
},
"content_type": "application/json",
"body": "{\"name\": \"test\"}",
"except_sign_token": "ak:K1DI0goT05yhGizDFE5FiPJxAj4=",
},
{
"method": "POST",
"host": None,
"url": "",
"qheaders": {
"Content-Type": "application/json",
"X-Qiniu": "b",
},
"content_type": "application/json",
"body": "{\"name\": \"test\"}",
"except_sign_token": "ak:0ujEjW_vLRZxebsveBgqa3JyQ-w=",
},
{
"method": "GET",
"host": "upload.qiniup.com",
"url": "http://upload.qiniup.com",
"qheaders": {
"X-Qiniu-": "a",
"X-Qiniu": "b",
"Content-Type": "application/x-www-form-urlencoded",
},
"content_type": "application/x-www-form-urlencoded",
"body": "{\"name\": \"test\"}",
"except_sign_token": "ak:GShw5NitGmd5TLoo38nDkGUofRw=",
},
{
"method": "GET",
"host": "upload.qiniup.com",
"url": "http://upload.qiniup.com",
"qheaders": {
"Content-Type": "application/json",
"X-Qiniu-Bbb": "BBB",
"X-Qiniu-Aaa": "DDD",
"X-Qiniu-": "a",
"X-Qiniu": "b",
},
"content_type": "application/json",
"body": "{\"name\": \"test\"}",
"except_sign_token": "ak:DhNA1UCaBqSHCsQjMOLRfVn63GQ=",
},
{
"method": "GET",
"host": "upload.qiniup.com",
"url": "http://upload.qiniup.com",
"qheaders": {
"Content-Type": "application/x-www-form-urlencoded",
"X-Qiniu-Bbb": "BBB",
"X-Qiniu-Aaa": "DDD",
"X-Qiniu-": "a",
"X-Qiniu": "b",
},
"content_type": "application/x-www-form-urlencoded",
"body": "name=test&language=go",
"except_sign_token": "ak:KUAhrYh32P9bv0COD8ugZjDCmII=",
},
{
"method": "GET",
"host": "upload.qiniup.com",
"url": "http://upload.qiniup.com",
"qheaders": {
"Content-Type": "application/x-www-form-urlencoded",
"X-Qiniu-Bbb": "BBB",
"X-Qiniu-Aaa": "DDD",
},
"content_type": "application/x-www-form-urlencoded",
"body": "name=test&language=go",
"except_sign_token": "ak:KUAhrYh32P9bv0COD8ugZjDCmII=",
},
{
"method": "GET",
"host": "upload.qiniup.com",
"url": "http://upload.qiniup.com/mkfile/sdf.jpg",
"qheaders": {
"Content-Type": "application/x-www-form-urlencoded",
"X-Qiniu-Bbb": "BBB",
"X-Qiniu-Aaa": "DDD",
"X-Qiniu-": "a",
"X-Qiniu": "b",
},
"content_type": "application/x-www-form-urlencoded",
"body": "name=test&language=go",
"except_sign_token": "ak:fkRck5_LeyfwdkyyLk-hyNwGKac=",
},
{
"method": "GET",
"host": "upload.qiniup.com",
"url": "http://upload.qiniup.com/mkfile/sdf.jpg?s=er3&df",
"qheaders": {
"Content-Type": "application/x-www-form-urlencoded",
"X-Qiniu-Bbb": "BBB",
"X-Qiniu-Aaa": "DDD",
"X-Qiniu-": "a",
"X-Qiniu": "b",
},
"content_type": "application/x-www-form-urlencoded",
"body": "name=test&language=go",
"except_sign_token": "ak:PUFPWsEUIpk_dzUvvxTTmwhp3p4=",
},
]

for test_case in test_cases:
sign_token = auth.token_of_request(
method=test_case["method"],
host=test_case["host"],
url=test_case["url"],
qheaders=auth.qiniu_headers(test_case["qheaders"]),
content_type=test_case["content_type"],
body=test_case["body"],
)
assert sign_token == test_case["except_sign_token"]

def test_verify_callback(self):
body = 'name=sunflower.jpg&hash=Fn6qeQi4VDLQ347NiRm-RlQx_4O2&location=Shanghai&price=1500.00&uid=123'
url = 'test.qiniu.com/callback'
ok = dummy_auth.verify_callback('QBox abcdefghklmnopq:ZWyeM5ljWMRFwuPTPOwQ4RwSto4=', url, body)
assert ok


class BucketTestCase(unittest.TestCase):
q = Auth(access_key, secret_key)
bucket = BucketManager(q)
Expand Down
Loading

0 comments on commit 0dbde38

Please sign in to comment.