Skip to content

Commit

Permalink
Move failed airlock requests to "failed" state (#2395)
Browse files Browse the repository at this point in the history
* add failed status to airlock

* handle exceptions by sending a failed event to event grid

* fix typo

* do not raise the error in the catch, if the function's status is failed the output binding is not called

* log the exception

* fix "Error exception must derive from BaseException" error thrown

* add error message to failed airlock requests

* add airlock exceptions for a more specific cases

* fix test

* handle specific exceptions instead of handling all exceptions the same way

* update versions and changelog

* add error_message to sample airlock request in test

* fix airlock tests

* move strings to constants.py and rename variable
  • Loading branch information
yuvalyaron authored Aug 4, 2022
1 parent 5c9ff90 commit ae8c251
Show file tree
Hide file tree
Showing 21 changed files with 127 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ ENHANCEMENTS:

* Guacamole logs are sent to Application Insights ([#2376](https://github.com/microsoft/AzureTRE/pull/2376))
* `make tre-start/stop` run in parallel which saves ~5 minutes ([#2394](https://github.com/microsoft/AzureTRE/pull/2394))
* Airlock requests that fail move to status "Failed" ([#2268](https://github.com/microsoft/AzureTRE/pull/2395))

BUG FIXES:

Expand Down
2 changes: 1 addition & 1 deletion airlock_processor/BlobCreatedTrigger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,4 @@ def main(msg: func.ServiceBusMessage,
subject=request_id,
event_type="Airlock.StepResult",
event_time=datetime.datetime.utcnow(),
data_version="1.0"))
data_version=constants.STEP_RESULT_EVENT_DATA_VERSION))
2 changes: 1 addition & 1 deletion airlock_processor/ScanResultTrigger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ def main(msg: func.ServiceBusMessage,
subject=request_id,
event_type="Airlock.StepResult",
event_time=datetime.datetime.utcnow(),
data_version="1.0"))
data_version=constants.STEP_RESULT_EVENT_DATA_VERSION))
51 changes: 37 additions & 14 deletions airlock_processor/StatusChangedQueueTrigger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import logging

import azure.functions as func
import datetime
import os
import uuid
import json

from exceptions.NoFilesInRequestException import NoFilesInRequestException
from exceptions.TooManyFilesInRequestException import TooManyFilesInRequestException

from shared_code import blob_operations, constants
from pydantic import BaseModel, parse_obj_as

Expand All @@ -24,22 +29,26 @@ def __init__(self, source_account_name: str, dest_account_name: str):
self.dest_account_name = dest_account_name


def main(msg: func.ServiceBusMessage):
body = msg.get_body().decode('utf-8')
logging.info('Python ServiceBus queue trigger processed message: %s', body)

def main(msg: func.ServiceBusMessage, outputEvent: func.Out[func.EventGridOutputEvent]):
try:
request_properties = extract_properties(body)
request_properties = extract_properties(msg)
handle_status_changed(request_properties)

new_status = request_properties.status
req_id = request_properties.request_id
ws_id = request_properties.workspace_id
request_type = request_properties.type
except Exception as e:
logging.error(f'Failed processing request - invalid message: {body}, exc: {e}')
raise
except NoFilesInRequestException:
report_failure(outputEvent, request_properties, failure_reason=constants.NO_FILES_IN_REQUEST_MESSAGE)
except TooManyFilesInRequestException:
report_failure(outputEvent, request_properties, failure_reason=constants.TOO_MANY_FILES_IN_REQUEST_MESSAGE)
except Exception:
report_failure(outputEvent, request_properties, failure_reason=constants.UNKNOWN_REASON_MESSAGE)

logging.info('Processing request with id %s. new status is "%s", type is "%s"', req_id, new_status, type)

def handle_status_changed(request_properties: RequestProperties):
new_status = request_properties.status
req_id = request_properties.request_id
ws_id = request_properties.workspace_id
request_type = request_properties.type

logging.info('Processing request with id %s. new status is "%s", type is "%s"', req_id, new_status, request_type)

try:
tre_id = os.environ["TRE_ID"]
Expand Down Expand Up @@ -68,8 +77,10 @@ def main(msg: func.ServiceBusMessage):
# Other statuses which do not require data copy are dismissed as we don't need to do anything...


def extract_properties(body: str) -> RequestProperties:
def extract_properties(msg: func.ServiceBusMessage) -> RequestProperties:
try:
body = msg.get_body().decode('utf-8')
logging.info('Python ServiceBus queue trigger processed message: %s', body)
json_body = json.loads(body)
result = parse_obj_as(RequestProperties, json_body["data"])
if not result:
Expand Down Expand Up @@ -136,3 +147,15 @@ def get_source_dest_for_copy(new_status: str, request_type: str, short_workspace
dest_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + short_workspace_id

return ContainersCopyMetadata(source_account_name, dest_account_name)


def report_failure(outputEvent, request_properties, failure_reason):
logging.exception(f"Failed processing Airlock request with ID: '{request_properties.request_id}', changing request status to '{constants.STAGE_FAILED}'.")
outputEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"completed_step": request_properties.status, "new_status": constants.STAGE_FAILED, "request_id": request_properties.request_id, "error_message": failure_reason},
subject=request_properties.request_id,
event_type="Airlock.StepResult",
event_time=datetime.datetime.utcnow(),
data_version=constants.STEP_RESULT_EVENT_DATA_VERSION))
7 changes: 7 additions & 0 deletions airlock_processor/StatusChangedQueueTrigger/function.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
"direction": "in",
"queueName": "%AIRLOCK_STATUS_CHANGED_QUEUE_NAME%",
"connection": "SB_CONNECTION_STRING"
},
{
"type": "eventGrid",
"name": "outputEvent",
"topicEndpointUri": "EVENT_GRID_STEP_RESULT_TOPIC_URI_SETTING",
"topicKeySetting": "EVENT_GRID_STEP_RESULT_TOPIC_KEY_SETTING",
"direction": "out"
}
]
}
2 changes: 1 addition & 1 deletion airlock_processor/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.1"
__version__ = "0.4.2"

This file was deleted.

2 changes: 2 additions & 0 deletions airlock_processor/exceptions/NoFilesInRequestException.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class NoFilesInRequestException(Exception):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class TooManyFilesInRequestException(Exception):
pass
11 changes: 7 additions & 4 deletions airlock_processor/shared_code/blob_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from azure.identity import DefaultAzureCredential
from azure.storage.blob import ContainerSasPermissions, generate_container_sas, BlobServiceClient

from exceptions.AirlockInvalidContainerException import AirlockInvalidContainerException
from exceptions.TooManyFilesInRequestException import TooManyFilesInRequestException
from exceptions.NoFilesInRequestException import NoFilesInRequestException


def create_container(account_name: str, request_id: str):
Expand Down Expand Up @@ -35,16 +36,18 @@ def copy_data(source_account_name: str, destination_account_name: str, request_i
if found_blobs > 0:
msg = "Request with id {} contains more than 1 file. flow aborted.".format(request_id)
logging.error(msg)
raise AirlockInvalidContainerException(msg)
raise TooManyFilesInRequestException(msg)
blob_name = blob.name
found_blobs += 1

if found_blobs == 0:
logging.info('Request with id %s did not contain any files. flow aborted.', request_id)
msg = "Request with id {} did not contain any files. flow aborted.".format(request_id)
logging.error(msg)
raise NoFilesInRequestException(msg)

except Exception:
logging.error('Request with id %s failed.', request_id)
raise ()
raise

udk = source_blob_service_client.get_user_delegation_key(datetime.datetime.utcnow() - datetime.timedelta(hours=1),
datetime.datetime.utcnow() + datetime.timedelta(hours=1))
Expand Down
9 changes: 9 additions & 0 deletions airlock_processor/shared_code/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,14 @@
STAGE_CANCELLED = "cancelled"
STAGE_BLOCKING_INPROGRESS = "blocking_in_progress"
STAGE_BLOCKED_BY_SCAN = "blocked_by_scan"
STAGE_FAILED = "failed"

# Messages
NO_FILES_IN_REQUEST_MESSAGE = "Request did not contain any files."
TOO_MANY_FILES_IN_REQUEST_MESSAGE = "Request contained more than 1 file."
UNKNOWN_REASON_MESSAGE = "Unknown reason."

# Event Grid
STEP_RESULT_EVENT_DATA_VERSION = "1.0"

NO_THREATS = "No threats found"
38 changes: 26 additions & 12 deletions airlock_processor/tests/test_copy_data.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,42 @@
from json import JSONDecodeError
import unittest

from pydantic import ValidationError
from StatusChangedQueueTrigger import extract_properties, get_source_dest_for_copy, is_require_data_copy
from azure.functions.servicebus import ServiceBusMessage


class TestPropertiesExtraction(unittest.TestCase):
def test_extract_prop_valid_body_return_all_values(self):
msg = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
req_prop = extract_properties(msg)
message_body = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
req_prop = extract_properties(message)
self.assertEqual(req_prop.request_id, "123")
self.assertEqual(req_prop.status, "456")
self.assertEqual(req_prop.type, "789")
self.assertEqual(req_prop.workspace_id, "ws1")

def test_extract_prop_missing_arg_throws(self):
msg = "{ \"data\": { \"status\":\"456\" , \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
self.assertRaises(Exception, extract_properties, msg)
message_body = "{ \"data\": { \"status\":\"456\" , \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
self.assertRaises(ValidationError, extract_properties, message)

msg = "{ \"data\": { \"request_id\":\"123\", \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
self.assertRaises(Exception, extract_properties, msg)
message_body = "{ \"data\": { \"request_id\":\"123\", \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
self.assertRaises(ValidationError, extract_properties, message)

msg = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"workspace_id\":\"ws1\" }}"
self.assertRaises(Exception, extract_properties, msg)
message_body = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"workspace_id\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
self.assertRaises(ValidationError, extract_properties, message)

msg = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"type\":\"789\" }}"
self.assertRaises(Exception, extract_properties, msg)
message_body = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"type\":\"789\" }}"
message = _mock_service_bus_message(body=message_body)
self.assertRaises(ValidationError, extract_properties, message)

def test_extract_prop_invalid_json_throws(self):
msg = "Hi"
self.assertRaises(JSONDecodeError, extract_properties, msg)
message_body = "Hi"
message = _mock_service_bus_message(body=message_body)
self.assertRaises(JSONDecodeError, extract_properties, message)


class TestDataCopyProperties(unittest.TestCase):
Expand All @@ -52,3 +60,9 @@ def test_wrong_status_raises_when_getting_storage_account_properties(self):

def test_wrong_type_raises_when_getting_storage_account_properties(self):
self.assertRaises(Exception, get_source_dest_for_copy, "accepted", "somethingelse")


def _mock_service_bus_message(body: str):
encoded_body = str.encode(body, "utf-8")
message = ServiceBusMessage(body=encoded_body, message_id="123", user_properties={})
return message
2 changes: 1 addition & 1 deletion api_app/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.6"
__version__ = "0.4.7"
4 changes: 2 additions & 2 deletions api_app/api/routes/airlock_resource_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ async def save_and_publish_event_airlock_request(airlock_request: AirlockRequest
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=strings.EVENT_GRID_GENERAL_ERROR_MESSAGE)


async def update_status_and_publish_event_airlock_request(airlock_request: AirlockRequest, airlock_request_repo: AirlockRequestRepository, user: User, new_status: AirlockRequestStatus, workspace: Workspace):
async def update_status_and_publish_event_airlock_request(airlock_request: AirlockRequest, airlock_request_repo: AirlockRequestRepository, user: User, new_status: AirlockRequestStatus, workspace: Workspace, error_message: str = None):
try:
logging.debug(f"Updating airlock request item: {airlock_request.id}")
updated_airlock_request = airlock_request_repo.update_airlock_request_status(airlock_request, new_status, user)
updated_airlock_request = airlock_request_repo.update_airlock_request_status(airlock_request, new_status, user, error_message)
except Exception as e:
logging.error(f'Failed updating airlock_request item {airlock_request}: {e}')
# If the validation failed, the error was not related to the saving itself
Expand Down
13 changes: 11 additions & 2 deletions api_app/db/repositories/airlock_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,15 @@ def _validate_status_update(self, current_status: AirlockRequestStatus, new_stat
in_review_condition = current_status == AirlockRequestStatus.InReview and (new_status == AirlockRequestStatus.ApprovalInProgress or new_status == AirlockRequestStatus.RejectionInProgress)
# Cancel is allowed only if the request is not actively changing, i.e. it is currently in draft or in review
cancel_condition = (current_status == AirlockRequestStatus.Draft or current_status == AirlockRequestStatus.InReview) and new_status == AirlockRequestStatus.Cancelled
# Failed is allowed from any non-final status
failed_condition = (current_status == AirlockRequestStatus.Draft
or current_status == AirlockRequestStatus.Submitted
or current_status == AirlockRequestStatus.InReview
or current_status == AirlockRequestStatus.ApprovalInProgress
or current_status == AirlockRequestStatus.RejectionInProgress
or current_status == AirlockRequestStatus.BlockingInProgress) and new_status == AirlockRequestStatus.Failed

return approved_condition and rejected_condition and blocked_condition and (approved_in_progress_condition or rejected_in_progress_condition or blocking_in_progress_condition or draft_condition or submit_condition or in_review_condition or cancel_condition)
return approved_condition and rejected_condition and blocked_condition and (approved_in_progress_condition or rejected_in_progress_condition or blocking_in_progress_condition or draft_condition or submit_condition or in_review_condition or cancel_condition or failed_condition)

def create_airlock_request_item(self, airlock_request_input: AirlockRequestInCreate, workspace_id: str) -> AirlockRequest:
full_airlock_request_id = str(uuid.uuid4())
Expand Down Expand Up @@ -77,11 +84,13 @@ def get_airlock_request_by_id(self, airlock_request_id: UUID4) -> AirlockRequest
raise EntityDoesNotExist
return parse_obj_as(AirlockRequest, airlock_requests)

def update_airlock_request_status(self, airlock_request: AirlockRequest, new_status: AirlockRequestStatus, user: User) -> AirlockRequest:
def update_airlock_request_status(self, airlock_request: AirlockRequest, new_status: AirlockRequestStatus, user: User, error_message: str = None) -> AirlockRequest:
current_status = airlock_request.status
if self._validate_status_update(current_status, new_status):
updated_request = copy.deepcopy(airlock_request)
updated_request.status = new_status
if new_status == AirlockRequestStatus.Failed:
updated_request.errorMessage = error_message
return self.update_airlock_resource_item(airlock_request, updated_request, user, {"previousStatus": current_status})
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=strings.AIRLOCK_REQUEST_ILLEGAL_STATUS_CHANGE)
Expand Down
2 changes: 2 additions & 0 deletions api_app/models/domain/airlock_operations.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from pydantic import Field
from pydantic.types import UUID4
from pydantic.schema import Optional
from models.domain.azuretremodel import AzureTREModel


class EventGridMessageData(AzureTREModel):
completed_step: str = Field(title="", description="")
new_status: str = Field(title="", description="")
request_id: str = Field(title="", description="")
error_message: Optional[str] = Field(title="", description="")


class StepResultStatusUpdateMessage(AzureTREModel):
Expand Down
3 changes: 3 additions & 0 deletions api_app/models/domain/airlock_request.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import List
from enum import Enum
from pydantic import Field
from pydantic.schema import Optional
from resources import strings
from models.domain.airlock_resource import AirlockResource, AirlockResourceType

Expand All @@ -20,6 +21,7 @@ class AirlockRequestStatus(str, Enum):

Blocked = strings.AIRLOCK_RESOURCE_STATUS_BLOCKED
BlockingInProgress = strings.AIRLOCK_RESOURCE_STATUS_BLOCKING_INPROGRESS
Failed = strings.AIRLOCK_RESOURCE_STATUS_FAILED


class AirlockRequestType(str, Enum):
Expand All @@ -37,3 +39,4 @@ class AirlockRequest(AirlockResource):
files: List[str] = Field([], title="Files of the request")
businessJustification: str = Field("Business Justifications", title="Explanation that will be provided to the request reviewer")
status = AirlockRequestStatus.Draft
errorMessage: Optional[str] = Field(title="Present only if the request have failed, provides the reason of the failure.")
1 change: 1 addition & 0 deletions api_app/resources/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@
AIRLOCK_RESOURCE_STATUS_CANCELLED = "cancelled"
AIRLOCK_RESOURCE_STATUS_BLOCKING_INPROGRESS = "blocking_in_progress"
AIRLOCK_RESOURCE_STATUS_BLOCKED = "blocked_by_scan"
AIRLOCK_RESOURCE_STATUS_FAILED = "failed"

# Airlock Request Types
AIRLOCK_REQUEST_TYPE_IMPORT = "import"
Expand Down
3 changes: 2 additions & 1 deletion api_app/service_bus/airlock_request_status_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ async def update_status_in_database(airlock_request_repo: AirlockRequestReposito
airlock_request_id = step_result_data.request_id
current_status = step_result_data.completed_step
new_status = AirlockRequestStatus(step_result_data.new_status)
error_message = step_result_data.error_message
# Find the airlock request by id
airlock_request = await get_airlock_request_by_id_from_path(airlock_request_id=airlock_request_id, airlock_request_repo=airlock_request_repo)
# Validate that the airlock request status is the same as current status
if airlock_request.status == current_status:
workspace = workspace_repo.get_workspace_by_id(airlock_request.workspaceId)
# update to new status and send to event grid
await update_status_and_publish_event_airlock_request(airlock_request=airlock_request, airlock_request_repo=airlock_request_repo, user=airlock_request.user, new_status=new_status, workspace=workspace)
await update_status_and_publish_event_airlock_request(airlock_request=airlock_request, airlock_request_repo=airlock_request_repo, user=airlock_request.user, new_status=new_status, workspace=workspace, error_message=error_message)
result = True
else:
error_string = strings.STEP_RESULT_MESSAGE_STATUS_DOES_NOT_MATCH.format(airlock_request_id, current_status, airlock_request.status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,22 @@
CANCELLED = AirlockRequestStatus.Cancelled
BLOCKING_IN_PROGRESS = AirlockRequestStatus.BlockingInProgress
BLOCKED = AirlockRequestStatus.Blocked
FAILED = AirlockRequestStatus.Failed

ALL_STATUSES = [enum.value for enum in AirlockRequestStatus]

ALLOWED_STATUS_CHANGES = {
DRAFT: [SUBMITTED, CANCELLED],
SUBMITTED: [IN_REVIEW, BLOCKING_IN_PROGRESS],
IN_REVIEW: [APPROVED_IN_PROGRESS, REJECTION_IN_PROGRESS, CANCELLED],
APPROVED_IN_PROGRESS: [APPROVED],
DRAFT: [SUBMITTED, CANCELLED, FAILED],
SUBMITTED: [IN_REVIEW, BLOCKING_IN_PROGRESS, FAILED],
IN_REVIEW: [APPROVED_IN_PROGRESS, REJECTION_IN_PROGRESS, CANCELLED, FAILED],
APPROVED_IN_PROGRESS: [APPROVED, FAILED],
APPROVED: [],
REJECTION_IN_PROGRESS: [REJECTED],
REJECTION_IN_PROGRESS: [REJECTED, FAILED],
REJECTED: [],
CANCELLED: [],
BLOCKING_IN_PROGRESS: [BLOCKED],
BLOCKING_IN_PROGRESS: [BLOCKED, FAILED],
BLOCKED: [],
FAILED: [],
}


Expand Down
Loading

0 comments on commit ae8c251

Please sign in to comment.