diff --git a/linebot/api.py b/linebot/api.py index 48a066eb..ee3e6d92 100644 --- a/linebot/api.py +++ b/linebot/api.py @@ -14,7 +14,6 @@ """linebot.api module.""" - import json from .__about__ import __version__ @@ -31,7 +30,10 @@ AudienceGroup, ClickAudienceGroup, ImpAudienceGroup, GetAuthorityLevel, Audience, CreateAudienceGroup ) -from .models.responses import Group, UserIds, RichMenuAliasResponse, RichMenuAliasListResponse +from .models.responses import ( + Group, UserIds, RichMenuAliasResponse, RichMenuAliasListResponse, ChannelAccessTokens, + IssueChannelTokenResponseV2, VerifyChannelTokenResponseV2, ValidAccessTokenKeyIDsResponse +) class LineBotApi(object): @@ -1594,6 +1596,128 @@ def get_followers_ids(self, limit=300, start=None, timeout=None): return UserIds.new_from_json_dict(response.json) + def issue_channel_access_token_v2_1( + self, client_assertion, grant_type='client_credentials', + client_assertion_type='urn:ietf:params:oauth:client-assertion-type:jwt-bearer', + timeout=None): + """Issues a channel access token v2.1. + + https://developers.line.biz/en/reference/messaging-api/#issue-channel-access-token-v2-1 + + :param str client_assertion: Client assertion. + :param str grant_type: `client_credentials` + :param str client_assertion_type: `urn:ietf:params:oauth:client-assertion-type:jwt-bearer`. + :param timeout: (optional) How long to wait for the server + to send data before giving up, as a float, + or a (connect timeout, read timeout) float tuple. + Default is self.http_client.timeout + :type timeout: float | tuple(float, float) + :rtype: :py:class:`linebot.models.responses.IssueChannelTokenResponseV2` + """ + response = self._post( + '/oauth2/v2.1/token', + data={ + 'grant_type': grant_type, + 'client_assertion_type': client_assertion_type, + 'client_assertion': client_assertion, + }, + headers={'Content-Type': 'application/x-www-form-urlencoded'}, + timeout=timeout + ) + + return IssueChannelTokenResponseV2.new_from_json_dict(response.json) + + def revoke_channel_access_token_v2_1( + self, client_id, + client_secret, access_token, + timeout=None): + """Revokes a channel access token v2.1. + + https://developers.line.biz/en/reference/messaging-api/#revoke-channel-access-token-v2-1 + + :param str client_id: Client id. + :param str client_secret: Channel secret. + :param str access_token: Channel access token. + :param timeout: (optional) How long to wait for the server + to send data before giving up, as a float, + or a (connect timeout, read timeout) float tuple. + Default is self.http_client.timeout + :type timeout: float | tuple(float, float) + """ + self._post( + '/oauth2/v2.1/revoke', + data={'client_id': client_id, + 'client_secret': client_secret, + 'access_token': access_token}, + timeout=timeout + ) + + def get_channel_access_tokens_v2_1( + self, client_assertion, + client_assertion_type='urn:ietf:params:oauth:client-assertion-type:jwt-bearer', + timeout=None): + """Get issued channel access tokens v2.1. + + https://developers.line.biz/en/reference/messaging-api/#get-issued-channel-access-tokens-v2-1 + + :param str client_assertion: Client assertion. + :param str client_assertion_type: `urn:ietf:params:oauth:client-assertion-type:jwt-bearer`. + :param timeout: (optional) How long to wait for the server + to send data before giving up, as a float, + or a (connect timeout, read timeout) float tuple. + Default is self.http_client.timeout + :type timeout: float | tuple(float, float) + :rtype: :py:class:`linebot.models.responses.ChannelAccessTokens` + """ + response = self._get( + '/oauth2/v2.1/tokens', + params={'client_assertion': client_assertion, + 'client_assertion_type': client_assertion_type}, + timeout=timeout + ) + return ChannelAccessTokens.new_from_json_dict(response.json) + + def verify_channel_access_token_v2_1(self, access_token, timeout=None): + """Validate channel access token v2.1. + + https://developers.line.biz/en/reference/messaging-api/#verfiy-channel-access-token-v2-1 + + :param str access_token: Channel access token. + :param timeout: (optional) How long to wait for the server + to send data before giving up, as a float, + or a (connect timeout, read timeout) float tuple. + Default is self.http_client.timeout + :type timeout: float | tuple(float, float) + :rtype: :py:class:`linebot.models.responses.VerifyChannelTokenResponseV2` + """ + response = self._get('/oauth2/v2.1/verify', + params={'access_token': access_token}, + timeout=timeout) + return VerifyChannelTokenResponseV2.new_from_json_dict(response.json) + + def get_channel_token_key_ids_v2_1( + self, client_assertion, + client_assertion_type='urn:ietf:params:oauth:client-assertion-type:jwt-bearer', + timeout=None): + """Get all valid channel access token key IDs v2.1. + + https://developers.line.biz/en/reference/messaging-api/#get-all-valid-channel-access-token-key-ids-v2-1 + + :param str client_assertion: Client assertion. + :param str client_assertion_type: `urn:ietf:params:oauth:client-assertion-type:jwt-bearer`. + :param timeout: (optional) How long to wait for the server + to send data before giving up, as a float, + or a (connect timeout, read timeout) float tuple. + Default is self.http_client.timeout + :type timeout: float | tuple(float, float) + :rtype: :py:class:`linebot.models.responses.VerifyChannelTokenResponseV2` + """ + response = self._get('/oauth2/v2.1/tokens/kid', + params={"client_assertion": client_assertion, + "client_assertion_type": client_assertion_type}, + timeout=timeout) + return ValidAccessTokenKeyIDsResponse.new_from_json_dict(response.json) + def _get(self, path, endpoint=None, params=None, headers=None, stream=False, timeout=None): url = (endpoint or self.endpoint) + path diff --git a/linebot/models/responses.py b/linebot/models/responses.py index 4d3e7e40..f05d1b92 100644 --- a/linebot/models/responses.py +++ b/linebot/models/responses.py @@ -14,7 +14,6 @@ """linebot.models.responses module.""" - from .base import Base from .insight import ( SubscriptionPeriodInsight, AppTypeInsight, AgeInsight, @@ -843,3 +842,88 @@ def __init__(self, user_ids=None, next=None, **kwargs): self.user_ids = user_ids self.next = next + + +class IssueChannelTokenResponseV2(Base): + """IssueAccessTokenResponseV2. + + https://developers.line.biz/en/reference/messaging-api/#issue-channel-access-token-v2-1 + """ + + def __init__(self, access_token=None, expires_in=None, token_type=None, key_id=None, **kwargs): + """__init__ method. + + :param str access_token: Short-lived channel access token. + :param int expires_in: Time until channel access token expires in seconds + from time the token is issued. + :param str token_type: Bearer. + :param key_id: Unique key ID for identifying the channel access token. + :param kwargs: + """ + super(IssueChannelTokenResponseV2, self).__init__(**kwargs) + + self.access_token = access_token + self.expires_in = expires_in + self.token_type = token_type + self.key_id = key_id + + +class ChannelAccessTokens(Base): + """ChannelAccessTokens. + + https://developers.line.biz/en/reference/messaging-api/#get-issued-channel-access-tokens-v2-1 + """ + + def __init__(self, access_tokens=None, **kwargs): + """__init__ method. + + :param access_tokens: List of channel access token + :type access_tokens: list[str] + :param kwargs: + + """ + super(ChannelAccessTokens, self).__init__(**kwargs) + + self.access_tokens = access_tokens + + +class VerifyChannelTokenResponseV2(Base): + """VerifyChannelTokenResponseV2. + + https://developers.line.biz/en/reference/messaging-api/#verfiy-channel-access-token-v2-1 + + """ + + def __init__(self, client_id=None, expires_in=None, scope=None, **kwargs): + """__init__ method. + + :param str client_id: The channel ID for which the channel access token was issued. + :param int expires_in: Number of seconds before the channel access token expires. + :param str scope: Permissions granted to the channel access token. + :param kwargs: + + """ + super(VerifyChannelTokenResponseV2, self).__init__(**kwargs) + + self.client_id = client_id + self.expires_in = expires_in + self.scope = scope + + +class ValidAccessTokenKeyIDsResponse(Base): + """ValidAccessTokenKeyIDsResponse. + + https://developers.line.biz/en/reference/messaging-api/#get-all-valid-channel-access-token-key-ids-v2-1 + + """ + + def __init__(self, kids=None, **kwargs): + """__init__ method. + + :param kids: Array of channel access token key IDs. + :type kids: list[str] + :param kwargs: + """ + super(ValidAccessTokenKeyIDsResponse, self).__init__(**kwargs) + + self.kids = kids diff --git a/tests/api/test_channel_access_token_v2_1.py b/tests/api/test_channel_access_token_v2_1.py new file mode 100644 index 00000000..7421efab --- /dev/null +++ b/tests/api/test_channel_access_token_v2_1.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import unicode_literals, absolute_import + +import sys +import unittest + +import responses + +from linebot import ( + LineBotApi +) + +PY3 = sys.version_info[0] == 3 +if PY3: + from urllib import parse +else: + import urlparse as parse + + +class TestLineBotApi(unittest.TestCase): + def setUp(self): + self.tested = LineBotApi('channel_secret') + self.access_token = 'W1TeHCgfH2Liwa.....' + self.expires_in = 2592000 + self.token_type = 'Bearer' + self.client_assertion = 'eyJhbGciOiJSUzI.q....' + self.client_id = 'client_id' + self.client_secret = 'client_secret' + self.client_assertion_type = 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer' + self.key_id = 'sDTOzw5wIfxxxxPEzcmeQA' + self.scope = 'profile chat_message.write' + + @responses.activate + def test_issue_channel_access_token_v2_1(self): + endpoint = LineBotApi.DEFAULT_API_ENDPOINT + '/oauth2/v2.1/token' + responses.add( + responses.POST, + endpoint, + json={ + 'access_token': self.access_token, + 'expires_in': self.expires_in, + 'token_type': self.token_type, + 'key_id': self.key_id + }, + status=200 + ) + + issue_access_token_response = self.tested.issue_channel_access_token_v2_1( + self.client_assertion + ) + + request = responses.calls[0].request + self.assertEqual('POST', request.method) + self.assertEqual(endpoint, request.url) + self.assertEqual('application/x-www-form-urlencoded', request.headers['content-type']) + self.assertEqual(self.access_token, issue_access_token_response.access_token) + self.assertEqual(self.expires_in, issue_access_token_response.expires_in) + self.assertEqual(self.token_type, issue_access_token_response.token_type) + + encoded_body = parse.parse_qs(request.body) + self.assertEqual('client_credentials', encoded_body['grant_type'][0]) + self.assertEqual(self.client_assertion_type, encoded_body['client_assertion_type'][0]) + self.assertEqual(self.client_assertion, encoded_body['client_assertion'][0]) + + @responses.activate + def test_get_channel_access_token_v2_1(self): + + endpoint = LineBotApi.DEFAULT_API_ENDPOINT + '/oauth2/v2.1/tokens' + responses.add( + responses.GET, + endpoint, + json={ + 'access_tokens': [ + 'fgIkeLcl3.....', + 'eyJhbGciO.....', + 'oeLklsSi7.....' + ] + }, + status=200 + ) + channel_access_tokens_response = self.tested.get_channel_access_tokens_v2_1( + self.client_assertion + ) + + request = responses.calls[0].request + self.assertEqual(request.method, 'GET') + self.assertEqual( + parse.unquote(request.url), + parse.unquote('{}?client_assertion={}&client_assertion_type={}'.format( + endpoint, self.client_assertion, self.client_assertion_type + )) + ) + self.assertEqual(channel_access_tokens_response.access_tokens, [ + 'fgIkeLcl3.....', + 'eyJhbGciO.....', + 'oeLklsSi7.....' + ]) + + @responses.activate + def test_revoke_channel_access_token_v2_1(self): + endpoint = LineBotApi.DEFAULT_API_ENDPOINT + '/oauth2/v2.1/revoke' + + responses.add( + responses.POST, + endpoint, + status=200 + ) + + self.tested.revoke_channel_access_token_v2_1( + self.client_id, self.client_secret, self.access_token + ) + + request = responses.calls[0].request + self.assertEqual('POST', request.method) + self.assertEqual(endpoint, request.url) + + encoded_body = parse.parse_qs(request.body) + self.assertEqual(self.client_id, encoded_body['client_id'][0]) + self.assertEqual(self.client_secret, encoded_body['client_secret'][0]) + self.assertEqual(self.access_token, encoded_body['access_token'][0]) + + @responses.activate + def test_verify_channel_access_token_v2_1(self): + endpoint = LineBotApi.DEFAULT_API_ENDPOINT + '/oauth2/v2.1/verify' + + responses.add( + responses.GET, + endpoint, + status=200, + json={ + 'client_id': self.client_id, + 'expires_in': self.expires_in, + 'scope': self.scope, + }, + ) + + self.tested.verify_channel_access_token_v2_1(self.access_token) + + request = responses.calls[0].request + self.assertEqual('GET', request.method) + self.assertEqual(endpoint, request.url) + + encoded_body = parse.parse_qs(request.body) + self.assertEqual(self.client_id, encoded_body['client_id'][0]) + self.assertEqual(self.expires_in, encoded_body['expires_in'][0]) + self.assertEqual(self.scope, encoded_body['scope'][0]) + + @responses.activate + def test_get_channel_token_key_ids_v2_1(self): + endpoint = LineBotApi.DEFAULT_API_ENDPOINT + '/oauth2/v2.1/tokens/kid' + + responses.add( + responses.GET, + endpoint, + status=200, + json={ + 'kids': [self.key_id], + }, + ) + + self.tested.get_channel_token_key_ids_v2_1(self.client_assertion, self.client_assertion_type) + + request = responses.calls[0].request + self.assertEqual('GET', request.method) + self.assertEqual(endpoint, request.url) + + encoded_body = parse.parse_qs(request.body) + self.assertEqual(self.kids, encoded_body['kids'][0]) + +if __name__ == '__main__': + unittest.main()