diff --git a/libraries/botbuilder-core/botbuilder/core/teams/teams_info.py b/libraries/botbuilder-core/botbuilder/core/teams/teams_info.py index 4afa50c05..cd17f004e 100644 --- a/libraries/botbuilder-core/botbuilder/core/teams/teams_info.py +++ b/libraries/botbuilder-core/botbuilder/core/teams/teams_info.py @@ -27,6 +27,9 @@ TeamsMeetingParticipant, MeetingNotificationBase, MeetingNotificationResponse, + BatchFailedEntriesResponse, + BatchOperationState, + TeamMember, ) @@ -137,6 +140,111 @@ async def _create_conversation_callback( ) return (conversation_reference, new_activity_id) + @staticmethod + async def send_message_to_list_of_users( + turn_context: TurnContext, + activity: Activity, + teams_members: List["TeamMember"], + tenant_id: str, + ) -> str: + """Sends a message to the provided list of Teams members.""" + if activity is None: + raise ValueError("activity is required.") + if not teams_members: + raise ValueError("teamsMembers is required.") + if not tenant_id: + raise ValueError("tenantId is required.") + + connector_client = await TeamsInfo.get_teams_connector_client(turn_context) + return await connector_client.teams.send_message_to_list_of_users( + activity, teams_members, tenant_id + ) + + @staticmethod + async def send_message_to_all_users_in_tenant( + turn_context: TurnContext, activity: Activity, tenant_id: str + ) -> str: + """Sends a message to all the users in a tenant.""" + if activity is None: + raise ValueError("activity is required.") + if not tenant_id: + raise ValueError("tenantId is required.") + + connector_client = await TeamsInfo.get_teams_connector_client(turn_context) + return await connector_client.teams.send_message_to_all_users_in_tenant( + activity, tenant_id + ) + + @staticmethod + async def send_message_to_all_users_in_team( + turn_context: TurnContext, activity: Activity, team_id: str, tenant_id: str + ) -> str: + """Sends a message to all the users in a team.""" + if activity is None: + raise ValueError("activity is required.") + if not team_id: + raise ValueError("teamId is required.") + if not tenant_id: + raise ValueError("tenantId is required.") + + connector_client = await TeamsInfo.get_teams_connector_client(turn_context) + return await connector_client.teams.send_message_to_all_users_in_team( + activity, team_id, tenant_id + ) + + @staticmethod + async def send_message_to_list_of_channels( + turn_context: TurnContext, + activity: Activity, + channels_members: List["TeamMember"], + tenant_id: str, + ) -> str: + """Sends a message to the provided list of Teams channels.""" + if activity is None: + raise ValueError("activity is required.") + if not channels_members: + raise ValueError("channelsMembers is required.") + if not tenant_id: + raise ValueError("tenantId is required.") + + connector_client = await TeamsInfo.get_teams_connector_client(turn_context) + return await connector_client.teams.send_message_to_list_of_channels( + activity, channels_members, tenant_id + ) + + @staticmethod + async def get_operation_state( + turn_context: TurnContext, operation_id: str + ) -> BatchOperationState: + """Gets the state of an operation.""" + if not operation_id: + raise ValueError("operationId is required.") + + connector_client = await TeamsInfo.get_teams_connector_client(turn_context) + return await connector_client.teams.get_operation_state(operation_id) + + @staticmethod + async def get_paged_failed_entries( + turn_context: TurnContext, operation_id: str, continuation_token: str = None + ) -> BatchFailedEntriesResponse: + """Gets the failed entries of a batch operation.""" + if not operation_id: + raise ValueError("operationId is required.") + + connector_client = await TeamsInfo.get_teams_connector_client(turn_context) + return await connector_client.teams.get_paged_failed_entries( + operation_id, continuation_token + ) + + @staticmethod + async def cancel_operation(turn_context: TurnContext, operation_id: str) -> None: + """Cancels a batch operation by its id.""" + if not operation_id: + raise ValueError("operationId is required.") + + connector_client = await TeamsInfo.get_teams_connector_client(turn_context) + await connector_client.teams.cancel_operation(operation_id) + @staticmethod async def get_team_details( turn_context: TurnContext, team_id: str = "" diff --git a/libraries/botbuilder-core/tests/teams/test_teams_info.py b/libraries/botbuilder-core/tests/teams/test_teams_info.py index 00f4ad8a4..be5129e69 100644 --- a/libraries/botbuilder-core/tests/teams/test_teams_info.py +++ b/libraries/botbuilder-core/tests/teams/test_teams_info.py @@ -4,6 +4,9 @@ import json import aiounittest from botbuilder.schema.teams._models_py3 import ( + BatchFailedEntriesResponse, + BatchFailedEntry, + BatchOperationState, ContentType, MeetingNotificationChannelData, MeetingStageSurface, @@ -15,6 +18,7 @@ TaskModuleTaskInfo, ) from botframework.connector import Channels +from azure.core.exceptions import HttpResponseError from botbuilder.core import TurnContext, MessageFactory from botbuilder.core.teams import TeamsInfo, TeamsActivityHandler @@ -293,6 +297,139 @@ async def test_send_meeting_notificationt(self): handler = TeamsActivityHandler() await handler.on_turn(turn_context) + async def test_send_message_to_list_of_users(self): + adapter = SimpleAdapterWithCreateConversation() + + status_codes = ["201", "400", "403", "429"] + + for status_code in status_codes: + activity = Activity( + type="message", + text="test-send_message_to_list_of_users", + channel_id=Channels.ms_teams, + from_property=ChannelAccount(id="participantId-1", name=status_code), + service_url="https://test.coffee", + conversation=ConversationAccount(id="conversation-id"), + ) + + turn_context = TurnContext(adapter, activity) + handler = TeamsActivityHandler() + await handler.on_turn(turn_context) + + async def test_send_message_to_all_users_in_tenant(self): + adapter = SimpleAdapterWithCreateConversation() + + status_codes = ["201", "400", "403", "429"] + + for status_code in status_codes: + activity = Activity( + type="message", + text="test-send_message_to_all_users_in_tenant", + channel_id=Channels.ms_teams, + from_property=ChannelAccount(id="participantId-1", name=status_code), + service_url="https://test.coffee", + conversation=ConversationAccount(id="conversation-id"), + ) + + turn_context = TurnContext(adapter, activity) + handler = TeamsActivityHandler() + await handler.on_turn(turn_context) + + async def test_send_message_to_all_users_in_team(self): + adapter = SimpleAdapterWithCreateConversation() + + status_codes = ["201", "400", "403", "404", "429"] + + for status_code in status_codes: + activity = Activity( + type="message", + text="test-send_message_to_all_users_in_team", + channel_id=Channels.ms_teams, + from_property=ChannelAccount(id="participantId-1", name=status_code), + service_url="https://test.coffee", + conversation=ConversationAccount(id="conversation-id"), + ) + + turn_context = TurnContext(adapter, activity) + handler = TeamsActivityHandler() + await handler.on_turn(turn_context) + + async def test_send_message_to_list_of_channels(self): + adapter = SimpleAdapterWithCreateConversation() + + status_codes = ["201", "400", "403", "429"] + + for status_code in status_codes: + activity = Activity( + type="message", + text="test-send_message_to_list_of_channels", + channel_id=Channels.ms_teams, + from_property=ChannelAccount(id="participantId-1", name=status_code), + service_url="https://test.coffee", + conversation=ConversationAccount(id="conversation-id"), + ) + + turn_context = TurnContext(adapter, activity) + handler = TeamsActivityHandler() + await handler.on_turn(turn_context) + + async def test_get_operation_state(self): + adapter = SimpleAdapterWithCreateConversation() + + status_codes = ["200", "400", "429"] + + for status_code in status_codes: + activity = Activity( + type="message", + text="test-get_operation_state", + channel_id=Channels.ms_teams, + from_property=ChannelAccount(id="participantId-1", name=status_code), + service_url="https://test.coffee", + conversation=ConversationAccount(id="conversation-id"), + ) + + turn_context = TurnContext(adapter, activity) + handler = TeamsActivityHandler() + await handler.on_turn(turn_context) + + async def test_get_paged_failed_entries(self): + adapter = SimpleAdapterWithCreateConversation() + + status_codes = ["200", "400", "429"] + + for status_code in status_codes: + activity = Activity( + type="message", + text="test-get_paged_failed_entries", + channel_id=Channels.ms_teams, + from_property=ChannelAccount(id="participantId-1", name=status_code), + service_url="https://test.coffee", + conversation=ConversationAccount(id="conversation-id"), + ) + + turn_context = TurnContext(adapter, activity) + handler = TeamsActivityHandler() + await handler.on_turn(turn_context) + + async def test_cancel_operation(self): + adapter = SimpleAdapterWithCreateConversation() + + status_codes = ["200", "400", "429"] + + for status_code in status_codes: + activity = Activity( + type="message", + text="test-cancel_operation", + channel_id=Channels.ms_teams, + from_property=ChannelAccount(id="participantId-1", name=status_code), + service_url="https://test.coffee", + conversation=ConversationAccount(id="conversation-id"), + ) + + turn_context = TurnContext(adapter, activity) + handler = TeamsActivityHandler() + await handler.on_turn(turn_context) + class TestTeamsActivityHandler(TeamsActivityHandler): async def on_turn(self, turn_context: TurnContext): @@ -302,6 +439,20 @@ async def on_turn(self, turn_context: TurnContext): await self.call_send_message_to_teams(turn_context) elif turn_context.activity.text == "test_send_meeting_notification": await self.call_send_meeting_notification(turn_context) + elif turn_context.activity.text == "test-send_message_to_list_of_users": + await self.call_send_message_to_list_of_users(turn_context) + elif turn_context.activity.text == "test-send_message_to_all_users_in_tenant": + await self.call_send_message_to_all_users_in_tenant(turn_context) + elif turn_context.activity.text == "test-send_message_to_all_users_in_team": + await self.call_send_message_to_all_users_in_team(turn_context) + elif turn_context.activity.text == "test-send_message_to_list_of_channels": + await self.call_send_message_to_list_of_channels(turn_context) + elif turn_context.activity.text == "test-get_operation_state": + await self.call_get_operation_state(turn_context) + elif turn_context.activity.text == "test-get_paged_failed_entries": + await self.call_get_paged_failed_entries(turn_context) + elif turn_context.activity.text == "test-cancel_operation": + await self.call_cancel_operation(turn_context) async def call_send_message_to_teams(self, turn_context: TurnContext): msg = MessageFactory.text("call_send_message_to_teams") @@ -353,6 +504,272 @@ async def call_send_meeting_notification(self, turn_context: TurnContext): f"Expected HttpOperationException with response status code {from_property.name}." ) + async def call_send_message_to_list_of_users(self, turn_context: TurnContext): + from_property = turn_context.activity.from_property + teams_members = [ + {"id": "member-1"}, + {"id": "member-2"}, + {"id": "member-3"}, + ] + tenant_id = "tenantId123" + + try: + result = await TeamsInfo.send_message_to_list_of_users( + turn_context, turn_context.activity, teams_members, tenant_id + ) + + # Handle based on the 'from_property.name' + if from_property.name == "201": + assert result == "operation-1" + else: + raise TypeError( + f"Expected HttpOperationException with response status code {from_property.name}." + ) + + except HttpResponseError as ex: + # Assert that the response status code matches the from_property.name + assert from_property.name == str(int(ex.response.status_code)) + + # Deserialize the error response content to an ErrorResponse object + error_response = json.loads(ex.response.content) + + # Handle based on error codes + if from_property.name == "400": + assert error_response["error"]["code"] == "BadSyntax" + elif from_property.name == "403": + assert error_response["error"]["code"] == "Forbidden" + elif from_property.name == "429": + assert len(ex.response.reason_phrase) == 11 + else: + raise TypeError( + f"Expected HttpOperationException with response status code {from_property.name}." + ) + + async def call_send_message_to_all_users_in_tenant(self, turn_context: TurnContext): + from_property = turn_context.activity.from_property + tenant_id = "tenantId123" + + try: + result = await TeamsInfo.send_message_to_all_users_in_tenant( + turn_context, turn_context.activity, tenant_id + ) + + # Handle based on the 'from_property.name' + if from_property.name == "201": + assert result == "operation-1" + else: + raise TypeError( + f"Expected HttpOperationException with response status code {from_property.name}." + ) + + except HttpResponseError as ex: + # Assert that the response status code matches the from_property.name + assert from_property.name == str(int(ex.response.status_code)) + + # Deserialize the error response content to an ErrorResponse object + error_response = json.loads(ex.response.content) + + # Handle based on error codes + if from_property.name == "400": + assert error_response["error"]["code"] == "BadSyntax" + elif from_property.name == "403": + assert error_response["error"]["code"] == "Forbidden" + elif from_property.name == "429": + assert len(ex.response.reason_phrase) == 11 + else: + raise TypeError( + f"Expected HttpOperationException with response status code {from_property.name}." + ) + + async def call_send_message_to_all_users_in_team(self, turn_context: TurnContext): + from_property = turn_context.activity.from_property + team_id = "teamId123" + tenant_id = "tenantId123" + + try: + result = await TeamsInfo.send_message_to_all_users_in_team( + turn_context, turn_context.activity, team_id, tenant_id + ) + + # Handle based on the 'from_property.name' + if from_property.name == "201": + assert result == "operation-1" + else: + raise TypeError( + f"Expected HttpOperationException with response status code {from_property.name}." + ) + + except HttpResponseError as ex: + # Assert that the response status code matches the from_property.name + assert from_property.name == str(int(ex.response.status_code)) + + # Deserialize the error response content to an ErrorResponse object + error_response = json.loads(ex.response.content) + + # Handle based on error codes + if from_property.name == "400": + assert error_response["error"]["code"] == "BadSyntax" + elif from_property.name == "403": + assert error_response["error"]["code"] == "Forbidden" + elif from_property.name == "404": + assert error_response["error"]["code"] == "NotFound" + elif from_property.name == "429": + assert len(ex.response.reason_phrase) == 11 + else: + raise TypeError( + f"Expected HttpOperationException with response status code {from_property.name}." + ) + + async def call_send_message_to_list_of_channels(self, turn_context: TurnContext): + from_property = turn_context.activity.from_property + members = [ + {"id": "channel-1"}, + {"id": "channel-2"}, + {"id": "channel-3"}, + ] + tenant_id = "tenantId123" + + try: + result = await TeamsInfo.send_message_to_list_of_channels( + turn_context, turn_context.activity, members, tenant_id + ) + + # Handle based on the 'from_property.name' + if from_property.name == "201": + assert result == "operation-1" + else: + raise TypeError( + f"Expected HttpOperationException with response status code {from_property.name}." + ) + + except HttpResponseError as ex: + # Assert that the response status code matches the from_property.name + assert from_property.name == str(int(ex.response.status_code)) + + # Deserialize the error response content to an ErrorResponse object + error_response = json.loads(ex.response.content) + + # Handle based on error codes + if from_property.name == "400": + assert error_response["error"]["code"] == "BadSyntax" + elif from_property.name == "403": + assert error_response["error"]["code"] == "Forbidden" + elif from_property.name == "429": + assert len(ex.response.reason_phrase) == 11 + else: + raise TypeError( + f"Expected HttpOperationException with response status code {from_property.name}." + ) + + async def call_get_operation_state(self, turn_context: TurnContext): + from_property = turn_context.activity.from_property + operation_id = "operation-id*" + response = BatchOperationState(state="state-1", total_entries_count=1) + response.status_map[400] = 1 + + try: + operation_response = await TeamsInfo.get_operation_state( + turn_context, operation_id + from_property.name + ) + + # Handle based on the 'from_property.name' + if from_property.name == "200": + assert str(response) == str(operation_response) + else: + raise TypeError( + f"Expected HttpOperationException with response status code {from_property.name}." + ) + + except HttpResponseError as ex: + # Assert that the response status code matches the from_property.name + assert from_property.name == str(int(ex.response.status_code)) + + # Deserialize the error response content to an ErrorResponse object + error_response = json.loads(ex.response.content) + + # Handle based on error codes + if from_property.name == "400": + assert error_response["error"]["code"] == "BadSyntax" + elif from_property.name == "429": + assert len(ex.response.reason_phrase) == 11 + else: + raise TypeError( + f"Expected HttpOperationException with response status code {from_property.name}." + ) + + async def call_get_paged_failed_entries(self, turn_context: TurnContext): + from_property = turn_context.activity.from_property + operation_id = "operation-id*" + response = BatchFailedEntriesResponse(continuation_token="continuation-token") + response.failed_entries.append( + BatchFailedEntry(entry_id="entry-1", error="400 User not found") + ) + + try: + operation_response = await TeamsInfo.get_paged_failed_entries( + turn_context, operation_id + from_property.name + ) + + # Handle based on the 'from_property.name' + if from_property.name == "200": + assert str(response) == str(operation_response) + assert operation_response.failed_entries[0].id == "entry-1" + else: + raise TypeError( + f"Expected HttpOperationException with response status code {from_property.name}." + ) + + except HttpResponseError as ex: + # Assert that the response status code matches the from_property.name + assert from_property.name == str(int(ex.response.status_code)) + + # Deserialize the error response content to an ErrorResponse object + error_response = json.loads(ex.response.content) + + # Handle based on error codes + if from_property.name == "400": + assert error_response["error"]["code"] == "BadSyntax" + elif from_property.name == "429": + assert len(ex.response.reason_phrase) == 11 + else: + raise TypeError( + f"Expected HttpOperationException with response status code {from_property.name}." + ) + + async def call_cancel_operation(self, turn_context: TurnContext): + from_property = turn_context.activity.from_property + operation_id = "operation-id*" + exception = None + + try: + await TeamsInfo.cancel_operation( + turn_context, operation_id + from_property.name + ) + + if from_property.name == "200": + assert exception is None + else: + raise ValueError( + f"Expected HttpResponseError with response status code {from_property.name}." + ) + + except HttpResponseError as ex: + # Assert that the response status code matches the from_property.name + assert from_property.name == str(int(ex.response.status_code)) + + # Deserialize the error response content to an ErrorResponse object + error_response = json.loads(ex.response.content) + + # Handle based on error codes + if from_property.name == "400": + assert error_response["error"]["code"] == "BadSyntax" + elif from_property.name == "429": + assert len(ex.response.reason_phrase) == 11 + else: + raise TypeError( + f"Expected HttpOperationException with response status code {from_property.name}." + ) + def get_targeted_meeting_notification(self, from_account: ChannelAccount): recipients = [from_account.id] diff --git a/libraries/botbuilder-schema/botbuilder/schema/teams/__init__.py b/libraries/botbuilder-schema/botbuilder/schema/teams/__init__.py index be9aa11ce..e41017d42 100644 --- a/libraries/botbuilder-schema/botbuilder/schema/teams/__init__.py +++ b/libraries/botbuilder-schema/botbuilder/schema/teams/__init__.py @@ -89,6 +89,10 @@ from ._models_py3 import MeetingNotificationBase from ._models_py3 import MeetingNotificationResponse from ._models_py3 import OnBehalfOf +from ._models_py3 import TeamMember +from ._models_py3 import BatchFailedEntry +from ._models_py3 import BatchFailedEntriesResponse +from ._models_py3 import BatchOperationState __all__ = [ "AppBasedLinkQuery", @@ -179,4 +183,8 @@ "MeetingNotificationBase", "MeetingNotificationResponse", "OnBehalfOf", + "TeamMember", + "BatchFailedEntry", + "BatchFailedEntriesResponse", + "BatchOperationState", ] diff --git a/libraries/botbuilder-schema/botbuilder/schema/teams/_models_py3.py b/libraries/botbuilder-schema/botbuilder/schema/teams/_models_py3.py index 0b6e0e899..629164808 100644 --- a/libraries/botbuilder-schema/botbuilder/schema/teams/_models_py3.py +++ b/libraries/botbuilder-schema/botbuilder/schema/teams/_models_py3.py @@ -1,6 +1,7 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. +import datetime from enum import Enum from typing import List from msrest.serialization import Model @@ -3029,3 +3030,101 @@ def __init__( ): super(TargetedMeetingNotification, self).__init__(value=value, **kwargs) self.channel_data = channel_data + + +class BatchFailedEntry(Model): + """Specifies the failed entry with its id and error. + + :param entry_id: The id of the failed entry. + :type entry_id: str + :param error: The error of the failed entry. + :type error: str + """ + + _attribute_map = { + "entry_id": {"key": "entryId", "type": "str"}, + "error": {"key": "error", "type": "str"}, + } + + def __init__(self, *, entry_id: str = None, error: str = None, **kwargs): + super(BatchFailedEntry, self).__init__(**kwargs) + self.entry_id = entry_id + self.error = error + + +class BatchOperationState(Model): + """Object representing operation state. + + :param state: The operation state. + :type state: str + :param retry_after: The datetime value to retry the operation. + :type retry_after: datetime or None + :param total_entries_count: The number of entries. + :type total_entries_count: int + """ + + _attribute_map = { + "state": {"key": "state", "type": "str"}, + "status_map": {"key": "statusMap", "type": "{int, int}"}, + "retry_after": {"key": "retryAfter", "type": "iso-8601"}, + "total_entries_count": {"key": "totalEntriesCount", "type": "int"}, + } + + def __init__( + self, + *, + state: str = None, + status_map: dict = None, + retry_after: datetime = None, + total_entries_count: int = 0, + **kwargs + ): + super(BatchOperationState, self).__init__(**kwargs) + self.state = state + self.status_map = status_map or {} + self.retry_after = retry_after + self.total_entries_count = total_entries_count + + +class BatchFailedEntriesResponse(Model): + """Specifies the failed entries response. + + Contains a list of BatchFailedEntry. + + :param continuation_token: The continuation token for paginated results. + :type continuation_token: str + :param failed_entries: The list of failed entries result of a batch operation. + :type failed_entries: list[~botframework.connector.teams.models.BatchFailedEntry] + """ + + _attribute_map = { + "continuation_token": {"key": "continuationToken", "type": "str"}, + "failed_entries": {"key": "failedEntryResponses", "type": "[BatchFailedEntry]"}, + } + + def __init__( + self, + *, + continuation_token: str = None, + failed_entries: List["BatchFailedEntry"] = None, + **kwargs + ): + super(BatchFailedEntriesResponse, self).__init__(**kwargs) + self.continuation_token = continuation_token + self.failed_entries = failed_entries or [] + + +class TeamMember(Model): + """Describes a member. + + :param id: Unique identifier representing a member (user or channel). + :type id: str + """ + + _attribute_map = { + "id": {"key": "id", "type": "str"}, + } + + def __init__(self, *, id: str = None, **kwargs) -> None: + super(TeamMember, self).__init__(**kwargs) + self.id = id diff --git a/libraries/botframework-connector/botframework/connector/__init__.py b/libraries/botframework-connector/botframework/connector/__init__.py index e167a32ad..5b737decc 100644 --- a/libraries/botframework-connector/botframework/connector/__init__.py +++ b/libraries/botframework-connector/botframework/connector/__init__.py @@ -16,6 +16,7 @@ from .http_client_factory import HttpClientFactory from .http_request import HttpRequest from .http_response_base import HttpResponseBase +from .retry_action import RetryAction __all__ = [ "AsyncBfPipeline", @@ -27,6 +28,7 @@ "HttpClientFactory", "HttpRequest", "HttpResponseBase", + "RetryAction", ] __version__ = VERSION diff --git a/libraries/botframework-connector/botframework/connector/retry_action.py b/libraries/botframework-connector/botframework/connector/retry_action.py new file mode 100644 index 000000000..b7f962162 --- /dev/null +++ b/libraries/botframework-connector/botframework/connector/retry_action.py @@ -0,0 +1,33 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +import random + + +class RetryAction: + @staticmethod + async def run_async( + task, max_retries=10, initial_delay=500, retry_exception_handler=None + ): + delay = initial_delay + current_retry_count = 1 + errors = [] + while current_retry_count <= max_retries: + try: + return await task(current_retry_count) + except Exception as ex: + errors.append(ex) + if ( + retry_exception_handler + and retry_exception_handler(ex, current_retry_count) == 429 + ): + await RetryAction._wait_with_jitter(delay) + delay *= 2 # Exponential backoff + current_retry_count += 1 + raise Exception(f"Failed after {max_retries} retries", errors) + + @staticmethod + async def _wait_with_jitter(delay): + jitter = random.uniform(0.8, 1.2) + await asyncio.sleep(jitter * (delay / 1000.0)) diff --git a/libraries/botframework-connector/botframework/connector/teams/operations/teams_operations.py b/libraries/botframework-connector/botframework/connector/teams/operations/teams_operations.py index 6e453ae23..56c2258bb 100644 --- a/libraries/botframework-connector/botframework/connector/teams/operations/teams_operations.py +++ b/libraries/botframework-connector/botframework/connector/teams/operations/teams_operations.py @@ -5,9 +5,14 @@ # license information. # -------------------------------------------------------------------------- +from aiohttp.web import HTTPException +from typing import List from msrest.pipeline import ClientRawResponse from msrest.exceptions import HttpOperationError +from botbuilder.schema import Activity +from botframework.connector.retry_action import RetryAction + from ... import models @@ -148,7 +153,7 @@ def fetch_participant( tenant_id: str, custom_headers=None, raw=False, - **operation_config + **operation_config, ): """Fetches Teams meeting participant details. @@ -273,7 +278,7 @@ def send_meeting_notification( notification: models.MeetingNotificationBase, custom_headers=None, raw=False, - **operation_config + **operation_config, ): """Send a teams meeting notification. @@ -339,3 +344,426 @@ def send_meeting_notification( send_meeting_notification.metadata = { "url": "/v1/meetings/{meetingId}/notification" } + + async def send_message_to_list_of_users( + self, + activity: Activity, + teams_members: List[models.TeamMember], + tenant_id: str, + custom_headers=None, + raw=False, + **operation_config, + ): + """ + Send a message to a list of Teams members. + + :param activity: The activity to send. + :type activity: ~botframework.connector.models.Activity + :param teams_members: The tenant ID. + :type teams_members: list[~botframework.connector.teams.models.TeamMember]. + :param tenant_id: The tenant ID. + :type tenant_id: str + :param dict custom_headers: headers that will be added to the request + :param bool raw: returns the direct response alongside the + deserialized response + :param operation_config: :ref:`Operation configuration + overrides`. + :return: A response object containing the operation id. + :rtype: str or ~msrest.pipeline.ClientRawResponse + :raises: + :class:`HttpOperationError` + """ + + if activity is None: + raise ValueError(f"{activity} is required") + if not teams_members: + raise ValueError(f"{teams_members} is required") + if not tenant_id: + raise ValueError(f"{tenant_id} is required") + + content = { + "members": teams_members, + "activity": activity, + "tenant_id": tenant_id, + } + + async def task(_): + api_url = "v3/batch/conversation/users/" + + # Construct parameters + query_parameters = {} + + # Construct headers + header_parameters = {} + header_parameters["Accept"] = "application/json" + header_parameters["Content-Type"] = "application/json; charset=utf-8" + if custom_headers: + header_parameters.update(custom_headers) + + # Construct body + body_content = self._serialize.body(content, "content") + + # Construct and send request + request = self._client.post( + api_url, query_parameters, header_parameters, body_content + ) + response = self._client.send(request, stream=False, **operation_config) + + if response.status_code not in [200, 201, 202]: + raise models.ErrorResponseException(self._deserialize, response) + + deserialized = None + if response.status_code in {200, 201, 202}: + deserialized = self._deserialize(response) + + if raw: + client_raw_response = ClientRawResponse(deserialized, response) + return client_raw_response + + return deserialized + + def retry_exception_handler(exception, _): + if isinstance(exception, HTTPException) and exception.status_code == 429: + return 429 + return None + + return await RetryAction.run_async( + task, retry_exception_handler=retry_exception_handler + ) + + async def send_message_to_all_users_in_tenant( + self, + activity: Activity, + tenant_id: str, + custom_headers=None, + raw=False, + **operation_config, + ): + if activity is None: + raise ValueError(f"{activity} is required") + if not tenant_id: + raise ValueError(f"{tenant_id} is required") + + content = {"activity": activity, "tenant_id": tenant_id} + + async def task(_): + api_url = "v3/batch/conversation/tenant/" + + # Construct parameters + query_parameters = {} + + # Construct headers + header_parameters = {} + header_parameters["Accept"] = "application/json" + header_parameters["Content-Type"] = "application/json; charset=utf-8" + if custom_headers: + header_parameters.update(custom_headers) + + # Construct body + body_content = self._serialize.body(content, "content") + + # Construct and send request + request = self._client.post( + api_url, query_parameters, header_parameters, body_content + ) + response = self._client.send(request, stream=False, **operation_config) + + if response.status_code not in [200, 201, 202]: + raise models.ErrorResponseException(self._deserialize, response) + + deserialized = None + if response.status_code in [200, 201, 202]: + deserialized = self._deserialize(response) + + if raw: + client_raw_response = ClientRawResponse(deserialized, response) + return client_raw_response + + return deserialized + + def retry_exception_handler(exception, _): + if isinstance(exception, HTTPException) and exception.status_code == 429: + return 429 + return None + + return await RetryAction.run_async( + task, retry_exception_handler=retry_exception_handler + ) + + async def send_message_to_all_users_in_team( + self, + activity: Activity, + team_id: str, + tenant_id: str, + custom_headers=None, + raw=False, + **operation_config, + ): + if activity is None: + raise ValueError(f"{activity} is required") + if not team_id: + raise ValueError(f"{team_id} is required") + if not tenant_id: + raise ValueError(f"{tenant_id} is required") + + content = {"activity": activity, "team_id": team_id, "tenant_id": tenant_id} + + async def task(_): + api_url = "v3/batch/conversation/team/" + + # Construct parameters + query_parameters = {} + + # Construct headers + header_parameters = {} + header_parameters["Accept"] = "application/json" + header_parameters["Content-Type"] = "application/json; charset=utf-8" + if custom_headers: + header_parameters.update(custom_headers) + + # Construct body + body_content = self._serialize.body(content, "content") + + # Construct and send request + request = self._client.post( + api_url, query_parameters, header_parameters, body_content + ) + response = self._client.send(request, stream=False, **operation_config) + + if response.status_code not in [200, 201, 202]: + raise models.ErrorResponseException(self._deserialize, response) + + deserialized = None + if response.status_code in [200, 201, 202]: + deserialized = self._deserialize(response) + + if raw: + client_raw_response = ClientRawResponse(deserialized, response) + return client_raw_response + + return deserialized + + def retry_exception_handler(exception, _): + if isinstance(exception, HTTPException) and exception.status_code == 429: + return 429 + return None + + return await RetryAction.run_async( + task, retry_exception_handler=retry_exception_handler + ) + + async def send_message_to_list_of_channels( + self, + activity: Activity, + channels_members: List[models.TeamMember], + tenant_id: str, + custom_headers=None, + raw=False, + **operation_config, + ): + if activity is None: + raise ValueError(f"{activity} is required") + if not channels_members: + raise ValueError(f"{channels_members} is required") + if not tenant_id: + raise ValueError(f"{tenant_id} is required") + + content = { + "activity": activity, + "members": channels_members, + "tenant_id": tenant_id, + } + + async def task(_): + api_url = "v3/batch/conversation/channels/" + + # Construct parameters + query_parameters = {} + + # Construct headers + header_parameters = {} + header_parameters["Accept"] = "application/json" + header_parameters["Content-Type"] = "application/json; charset=utf-8" + if custom_headers: + header_parameters.update(custom_headers) + + # Construct body + body_content = self._serialize.body(content, "content") + + # Construct and send request + request = self._client.post( + api_url, query_parameters, header_parameters, body_content + ) + response = self._client.send(request, stream=False, **operation_config) + + if response.status_code not in [200, 201, 202]: + raise models.ErrorResponseException(self._deserialize, response) + + deserialized = None + if response.status_code in [200, 201, 202]: + deserialized = self._deserialize(response) + + if raw: + client_raw_response = ClientRawResponse(deserialized, response) + return client_raw_response + + return deserialized + + def retry_exception_handler(exception, _): + if isinstance(exception, HTTPException) and exception.status_code == 429: + return 429 + return None + + return await RetryAction.run_async( + task, retry_exception_handler=retry_exception_handler + ) + + async def get_operation_state( + self, operation_id: str, custom_headers=None, raw=False, **operation_config + ): + if not operation_id: + raise ValueError(f"{operation_id} is required") + + async def task(_): + api_url = "v3/batch/conversation/{operationId}" + + path_format_arguments = { + "operationId": self._serialize.url("operation_id", operation_id, "str"), + } + api_url = self._client.format_url(api_url, **path_format_arguments) + + # Construct parameters + query_parameters = {} + + # Construct headers + header_parameters = {} + header_parameters["Accept"] = "application/json" + if custom_headers: + header_parameters.update(custom_headers) + + # Construct and send request + request = self._client.get(api_url, query_parameters, header_parameters) + response = self._client.send(request, stream=False, **operation_config) + + if response.status_code not in [200]: + raise HttpOperationError(self._deserialize, response) + + deserialized = None + + if response.status_code == 200: + deserialized = self._deserialize(response) + + if raw: + client_raw_response = ClientRawResponse(deserialized, response) + return client_raw_response + + return deserialized + + def retry_exception_handler(exception, _): + if isinstance(exception, HTTPException) and exception.status_code == 429: + return 429 + return None + + return await RetryAction.run_async( + task, retry_exception_handler=retry_exception_handler + ) + + async def get_paged_failed_entries( + self, operation_id: str, custom_headers=None, raw=False, **operation_config + ): + if not operation_id: + raise ValueError(f"{operation_id} is required") + + async def task(_): + api_url = "v3/batch/conversation/failedentries/{operationId}" + + path_format_arguments = { + "operationId": self._serialize.url("operation_id", operation_id, "str"), + } + api_url = self._client.format_url(api_url, **path_format_arguments) + + # Construct parameters + query_parameters = {} + + # Construct headers + header_parameters = {} + header_parameters["Accept"] = "application/json" + if custom_headers: + header_parameters.update(custom_headers) + + # Construct and send request + request = self._client.get(api_url, query_parameters, header_parameters) + response = self._client.send(request, stream=False, **operation_config) + + if response.status_code not in [200]: + raise HttpOperationError(self._deserialize, response) + + deserialized = None + + if response.status_code == 200: + deserialized = self._deserialize(response) + + if raw: + client_raw_response = ClientRawResponse(deserialized, response) + return client_raw_response + + return deserialized + + def retry_exception_handler(exception, _): + if isinstance(exception, HTTPException) and exception.status_code == 429: + return 429 + return None + + return await RetryAction.run_async( + task, retry_exception_handler=retry_exception_handler + ) + + async def cancel_operation( + self, operation_id: str, custom_headers=None, raw=False, **operation_config + ): + if not operation_id: + raise ValueError(f"{operation_id} is required") + + async def task(_): + api_url = "v3/batch/conversation/{operationId}" + + path_format_arguments = { + "operationId": self._serialize.url("operation_id", operation_id, "str"), + } + api_url = self._client.format_url(api_url, **path_format_arguments) + + # Construct parameters + query_parameters = {} + + # Construct headers + header_parameters = {} + header_parameters["Accept"] = "application/json" + if custom_headers: + header_parameters.update(custom_headers) + + # Construct and send request + request = self._client.delete(api_url, query_parameters, header_parameters) + response = self._client.send(request, stream=False, **operation_config) + + if response.status_code not in [200]: + raise HttpOperationError(self._deserialize, response) + + deserialized = None + + if response.status_code == 200: + deserialized = self._deserialize(response) + + if raw: + client_raw_response = ClientRawResponse(deserialized, response) + return client_raw_response + + return deserialized + + def retry_exception_handler(exception, _): + if isinstance(exception, HTTPException) and exception.status_code == 429: + return 429 + return None + + return await RetryAction.run_async( + task, retry_exception_handler=retry_exception_handler + ) diff --git a/libraries/botframework-connector/tests/test_retry_action.py b/libraries/botframework-connector/tests/test_retry_action.py new file mode 100644 index 000000000..c35376ab6 --- /dev/null +++ b/libraries/botframework-connector/tests/test_retry_action.py @@ -0,0 +1,53 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. + +import asyncio +import aiounittest +from botframework.connector import RetryAction + + +class TestRetryAction(aiounittest.AsyncTestCase): + def setUp(self): + self.loop = asyncio.get_event_loop() + + def test_retry_action_fails_after_max_retries(self): + async def failing_task(retry_count): + raise Exception(f"error {retry_count}") + + with self.assertRaises(Exception) as context: + self.loop.run_until_complete( + RetryAction.run_async(failing_task, max_retries=3) + ) + self.assertEqual(context.exception.args[0], "Failed after 3 retries") + self.assertEqual(len(context.exception.args[1]), 3) + + def test_retry_action_retries_and_succeeds(self): + async def task(retry_count): + if retry_count < 3: + raise Exception(f"error {retry_count}") + return "success" + + result = self.loop.run_until_complete( + RetryAction.run_async(task, max_retries=3) + ) + self.assertEqual(result, "success") + + def test_retry_action_with_jitter_delay(self): + async def task(retry_count): + if retry_count < 2: + raise Exception("retry error") + return "success" + + async def mock_sleep(duration): + pass + + original_sleep = asyncio.sleep + asyncio.sleep = mock_sleep + + try: + result = self.loop.run_until_complete( + RetryAction.run_async(task, max_retries=3, initial_delay=100) + ) + self.assertEqual(result, "success") + finally: + asyncio.sleep = original_sleep