Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle request cancellation in Airlock processor #2584

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1b011ef
add 'previous status' field to 'status changed' message
yuvalyaron Sep 11, 2022
a64bd87
add support for container deletion in azure function 'toDeleteTrigger'
yuvalyaron Sep 11, 2022
c23976b
handle request cancellation in StatusChangedQueueTrigger
yuvalyaron Sep 11, 2022
c4bdaca
Merge branch 'main' of https://github.com/microsoft/AzureTRE into fea…
yuvalyaron Sep 11, 2022
fb0266b
fix output event names
yuvalyaron Sep 11, 2022
e9c884e
update versions
yuvalyaron Sep 11, 2022
bfff96e
clean code by extracting to methods
yuvalyaron Sep 11, 2022
894e628
Merge branch 'main' of https://github.com/microsoft/AzureTRE into fea…
yuvalyaron Sep 12, 2022
15bfa87
update changelog
yuvalyaron Sep 12, 2022
fdbe3b4
fix unit tests
yuvalyaron Sep 12, 2022
881dd42
add unit tests
yuvalyaron Sep 12, 2022
89444dd
Merge branch 'main' of https://github.com/microsoft/AzureTRE into fea…
yuvalyaron Sep 12, 2022
b527e13
use already declared variables instead of request_properties
yuvalyaron Sep 12, 2022
5e55108
update api version
yuvalyaron Sep 12, 2022
0a1daa9
update changelog
yuvalyaron Sep 13, 2022
ceebec4
update log message
yuvalyaron Sep 13, 2022
bcc0e0a
Merge branch 'main' of https://github.com/microsoft/AzureTRE into fea…
yuvalyaron Sep 13, 2022
24f6936
rename references of toDelete event to dataDeletion event in statusCh…
yuvalyaron Sep 13, 2022
4528e60
change toDelete to DataDeletion
yuvalyaron Sep 13, 2022
a44e3bd
update version and changelog
yuvalyaron Sep 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ FEATURES:

ENHANCEMENTS:

*
* Cancelling an Airlock request triggers deletion of the request container and files ([#2584](https://github.com/microsoft/AzureTRE/pull/2584))

BUG FIXES:

Expand Down
4 changes: 2 additions & 2 deletions airlock_processor/BlobCreatedTrigger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ def main(msg: func.ServiceBusMessage,
id=str(uuid.uuid4()),
data={"blob_to_delete": copied_from[-1]}, # last container in copied_from is the one we just copied from
subject=request_id,
event_type="Airlock.ToDelete",
event_type="Airlock.DataDeletion",
event_time=datetime.datetime.utcnow(),
data_version="1.0"
data_version=constants.DATA_DELETION_EVENT_DATA_VERSION
)
)
162 changes: 106 additions & 56 deletions airlock_processor/StatusChangedQueueTrigger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from typing import Optional

import azure.functions as func
import datetime
Expand All @@ -14,7 +15,8 @@

class RequestProperties(BaseModel):
request_id: str
status: str
new_status: str
previous_status: Optional[str]
type: str
workspace_id: str

Expand All @@ -28,50 +30,46 @@ def __init__(self, source_account_name: str, dest_account_name: str):
self.dest_account_name = dest_account_name


def main(msg: func.ServiceBusMessage, outputEvent: func.Out[func.EventGridOutputEvent]):
def main(msg: func.ServiceBusMessage, stepResultEvent: func.Out[func.EventGridOutputEvent], dataDeletionEvent: func.Out[func.EventGridOutputEvent]):
try:
request_properties = extract_properties(msg)
request_files = get_request_files(request_properties) if request_properties.status == constants.STAGE_SUBMITTED else None
handle_status_changed(request_properties, outputEvent, request_files)
request_files = get_request_files(request_properties) if request_properties.new_status == constants.STAGE_SUBMITTED else None
handle_status_changed(request_properties, stepResultEvent, dataDeletionEvent, request_files)

except NoFilesInRequestException:
set_output_event_to_report_failure(outputEvent, request_properties, failure_reason=constants.NO_FILES_IN_REQUEST_MESSAGE, request_files=request_files)
set_output_event_to_report_failure(stepResultEvent, request_properties, failure_reason=constants.NO_FILES_IN_REQUEST_MESSAGE, request_files=request_files)
except TooManyFilesInRequestException:
set_output_event_to_report_failure(outputEvent, request_properties, failure_reason=constants.TOO_MANY_FILES_IN_REQUEST_MESSAGE, request_files=request_files)
set_output_event_to_report_failure(stepResultEvent, request_properties, failure_reason=constants.TOO_MANY_FILES_IN_REQUEST_MESSAGE, request_files=request_files)
except Exception:
set_output_event_to_report_failure(outputEvent, request_properties, failure_reason=constants.UNKNOWN_REASON_MESSAGE, request_files=request_files)
set_output_event_to_report_failure(stepResultEvent, request_properties, failure_reason=constants.UNKNOWN_REASON_MESSAGE, request_files=request_files)


def handle_status_changed(request_properties: RequestProperties, outputEvent: func.Out[func.EventGridOutputEvent], request_files):
new_status = request_properties.status
def handle_status_changed(request_properties: RequestProperties, stepResultEvent: func.Out[func.EventGridOutputEvent], dataDeletionEvent: func.Out[func.EventGridOutputEvent], request_files):
new_status = request_properties.new_status
previous_status = request_properties.previous_status
req_id = request_properties.request_id
ws_id = request_properties.workspace_id
request_type = request_properties.type

logging.info('Processing request with id %s. new status is "%s", type is "%s"', req_id, new_status, request_type)

try:
tre_id = os.environ["TRE_ID"]
except KeyError as e:
logging.error(f'Missing environment variable: {e}')
raise

if new_status == constants.STAGE_DRAFT and request_type == constants.IMPORT_TYPE:
account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL + tre_id
if new_status == constants.STAGE_DRAFT:
account_name = get_storage_account(status=constants.STAGE_DRAFT, request_type=request_type, short_workspace_id=ws_id)
blob_operations.create_container(account_name, req_id)
return

if new_status == constants.STAGE_DRAFT and request_type == constants.EXPORT_TYPE:
account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + ws_id
blob_operations.create_container(account_name, req_id)
if new_status == constants.STAGE_CANCELLED:
storage_account_name = get_storage_account(previous_status, request_type, ws_id)
container_to_delete_url = blob_operations.get_blob_url(account_name=storage_account_name, container_name=req_id)
set_output_event_to_trigger_container_deletion(dataDeletionEvent, request_properties, container_url=container_to_delete_url)
return

if new_status == constants.STAGE_SUBMITTED:
set_output_event_to_report_request_files(outputEvent, request_properties, request_files)
set_output_event_to_report_request_files(stepResultEvent, request_properties, request_files)

if (is_require_data_copy(new_status)):
logging.info('Request with id %s. requires data copy between storage accounts', req_id)
containers_metadata = get_source_dest_for_copy(new_status, request_type, ws_id)
containers_metadata = get_source_dest_for_copy(new_status=new_status, previous_status=previous_status, request_type=request_type, short_workspace_id=ws_id)
eladiw marked this conversation as resolved.
Show resolved Hide resolved
blob_operations.create_container(containers_metadata.dest_account_name, req_id)
blob_operations.copy_data(containers_metadata.source_account_name,
containers_metadata.dest_account_name, req_id)
Expand Down Expand Up @@ -104,78 +102,130 @@ def is_require_data_copy(new_status: str):
return False


def get_source_dest_for_copy(new_status: str, request_type: str, short_workspace_id: str) -> ContainersCopyMetadata:
def get_source_dest_for_copy(new_status: str, previous_status: str, request_type: str, short_workspace_id: str) -> ContainersCopyMetadata:
# sanity
if is_require_data_copy(new_status) is False:
raise Exception("Given new status is not supported")

try:
tre_id = os.environ["TRE_ID"]
except KeyError as e:
logging.error(f'Missing environment variable: {e}')
raise

request_type = request_type.lower()
if request_type != constants.IMPORT_TYPE and request_type != constants.EXPORT_TYPE:
msg = "Airlock request type must be either '{}' or '{}".format(str(constants.IMPORT_TYPE),
str(constants.EXPORT_TYPE))
logging.error(msg)
raise Exception(msg)

source_account_name = get_storage_account(previous_status, request_type, short_workspace_id)
dest_account_name = get_storage_account_destination_for_copy(new_status, request_type, short_workspace_id)
return ContainersCopyMetadata(source_account_name, dest_account_name)


def get_storage_account(status: str, request_type: str, short_workspace_id: str) -> str:
tre_id = _get_tre_id()

if request_type == constants.IMPORT_TYPE:
if status == constants.STAGE_DRAFT:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL + tre_id
elif status == constants.STAGE_APPROVED:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + short_workspace_id
elif status == constants.STAGE_REJECTED:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_REJECTED + tre_id
elif status == constants.STAGE_BLOCKED_BY_SCAN:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_BLOCKED + tre_id
elif status in [constants.STAGE_IN_REVIEW, constants.STAGE_SUBMITTED, constants.STAGE_APPROVAL_INPROGRESS, constants.STAGE_REJECTION_INPROGRESS, constants.STAGE_BLOCKING_INPROGRESS]:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id

if request_type == constants.EXPORT_TYPE:
if status == constants.STAGE_DRAFT:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + short_workspace_id
elif status == constants.STAGE_APPROVED:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_APPROVED + tre_id
elif status == constants.STAGE_REJECTED:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + short_workspace_id
elif status == constants.STAGE_BLOCKED_BY_SCAN:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + short_workspace_id
elif status in [constants.STAGE_IN_REVIEW, constants.STAGE_SUBMITTED, constants.STAGE_APPROVAL_INPROGRESS, constants.STAGE_REJECTION_INPROGRESS, constants.STAGE_BLOCKING_INPROGRESS]:
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id

error_message = f"Missing current storage account definition for status '{status}' and request type '{request_type}'."
logging.error(error_message)
raise Exception(error_message)


def get_storage_account_destination_for_copy(new_status: str, request_type: str, short_workspace_id: str) -> str:
tre_id = _get_tre_id()

if request_type == constants.IMPORT_TYPE:
if new_status == constants.STAGE_SUBMITTED:
source_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL + tre_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
return constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
elif new_status == constants.STAGE_APPROVAL_INPROGRESS:
source_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_IMPORT_APPROVED + short_workspace_id
elif new_status == constants.STAGE_REJECTION_INPROGRESS:
source_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_REJECTED + tre_id
return constants.STORAGE_ACCOUNT_NAME_IMPORT_REJECTED + tre_id
elif new_status == constants.STAGE_BLOCKING_INPROGRESS:
source_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS + tre_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_BLOCKED + tre_id
else:
return constants.STORAGE_ACCOUNT_NAME_IMPORT_BLOCKED + tre_id

if request_type == constants.EXPORT_TYPE:
if new_status == constants.STAGE_SUBMITTED:
source_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + short_workspace_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
elif new_status == constants.STAGE_APPROVAL_INPROGRESS:
source_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_APPROVED + tre_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_APPROVED + tre_id
elif new_status == constants.STAGE_REJECTION_INPROGRESS:
source_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_REJECTED + short_workspace_id
elif new_status == constants.STAGE_BLOCKING_INPROGRESS:
source_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS + short_workspace_id
dest_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + short_workspace_id
return constants.STORAGE_ACCOUNT_NAME_EXPORT_BLOCKED + short_workspace_id

return ContainersCopyMetadata(source_account_name, dest_account_name)
yuvalyaron marked this conversation as resolved.
Show resolved Hide resolved
error_message = f"Missing copy destination storage account definition for status '{new_status}' and request type '{request_type}'."
logging.error(error_message)
raise Exception(error_message)


def set_output_event_to_report_failure(outputEvent, request_properties, failure_reason, request_files):
def set_output_event_to_report_failure(stepResultEvent, request_properties, failure_reason, request_files):
logging.exception(f"Failed processing Airlock request with ID: '{request_properties.request_id}', changing request status to '{constants.STAGE_FAILED}'.")
outputEvent.set(
stepResultEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"completed_step": request_properties.status, "new_status": constants.STAGE_FAILED, "request_id": request_properties.request_id, "request_files": request_files, "error_message": failure_reason},
data={"completed_step": request_properties.new_status, "new_status": constants.STAGE_FAILED, "request_id": request_properties.request_id, "request_files": request_files, "error_message": failure_reason},
subject=request_properties.request_id,
event_type="Airlock.StepResult",
event_time=datetime.datetime.utcnow(),
data_version=constants.STEP_RESULT_EVENT_DATA_VERSION))


def set_output_event_to_report_request_files(outputEvent, request_properties, request_files):
outputEvent.set(
def set_output_event_to_report_request_files(stepResultEvent, request_properties, request_files):
logging.info(f'Sending file enumeration result for request with ID: {request_properties.request_id} result: {request_files}')
stepResultEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"completed_step": request_properties.status, "request_id": request_properties.request_id, "request_files": request_files},
data={"completed_step": request_properties.new_status, "request_id": request_properties.request_id, "request_files": request_files},
subject=request_properties.request_id,
event_type="Airlock.StepResult",
event_time=datetime.datetime.utcnow(),
data_version=constants.STEP_RESULT_EVENT_DATA_VERSION))


def get_request_files(request_properties):
containers_metadata = get_source_dest_for_copy(request_properties.status, request_properties.type, request_properties.workspace_id)
storage_account_name = containers_metadata.source_account_name
def set_output_event_to_trigger_container_deletion(dataDeletionEvent, request_properties, container_url):
logging.info(f'Sending container deletion event for request ID: {request_properties.request_id}. container URL: {container_url}')
dataDeletionEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"blob_to_delete": container_url},
eladiw marked this conversation as resolved.
Show resolved Hide resolved
subject=request_properties.request_id,
event_type="Airlock.DataDeletion",
event_time=datetime.datetime.utcnow(),
data_version=constants.DATA_DELETION_EVENT_DATA_VERSION
)
)


def get_request_files(request_properties: RequestProperties):
storage_account_name = get_storage_account(request_properties.previous_status, request_properties.type, request_properties.workspace_id)
return blob_operations.get_request_files(account_name=storage_account_name, request_id=request_properties.request_id)


def _get_tre_id():
try:
tre_id = os.environ["TRE_ID"]
except KeyError as e:
logging.error(f'Missing environment variable: {e}')
raise
return tre_id
9 changes: 8 additions & 1 deletion airlock_processor/StatusChangedQueueTrigger/function.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@
},
{
"type": "eventGrid",
"name": "outputEvent",
"name": "stepResultEvent",
"topicEndpointUri": "EVENT_GRID_STEP_RESULT_TOPIC_URI_SETTING",
"topicKeySetting": "EVENT_GRID_STEP_RESULT_TOPIC_KEY_SETTING",
"direction": "out"
},
{
"type": "eventGrid",
"name": "dataDeletionEvent",
"topicEndpointUri": "EVENT_GRID_TO_DELETE_TOPIC_URI_SETTING",
"topicKeySetting": "EVENT_GRID_TO_DELETE_TOPIC_KEY_SETTING",
"direction": "out"
}
]
}
7 changes: 6 additions & 1 deletion airlock_processor/ToDeleteTrigger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ def delete_blob_and_container_if_last_blob(blob_url: str):
credential=credential)
container_client = blob_service_client.get_container_client(container_name)

if not blob_name:
logging.info(f'No specific blob specified, deleting the entire container: {container_name}')
container_client.delete_container()
return

# If it's the only blob in the container, we need to delete the container too
# Check how many blobs are in the container (note: this exausts the generator)
blobs_num = sum(1 for _ in container_client.list_blobs())
Expand All @@ -33,7 +38,7 @@ def delete_blob_and_container_if_last_blob(blob_url: str):

def main(msg: func.ServiceBusMessage):
body = msg.get_body().decode('utf-8')
logging.info(f'Python ServiceBus queue trigger processed mesage: {body}')
logging.info(f'Python ServiceBus queue trigger processed message: {body}')
json_body = json.loads(body)

blob_url = json_body["data"]["blob_to_delete"]
Expand Down
2 changes: 1 addition & 1 deletion airlock_processor/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.6"
__version__ = "0.4.7"
4 changes: 4 additions & 0 deletions airlock_processor/shared_code/blob_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,7 @@ def get_blob_info_from_topic_and_subject(topic: str, subject: str):
def get_blob_info_from_blob_url(blob_url: str) -> Tuple[str, str, str]:
# Example of blob url: https://stalimappws663d.blob.core.windows.net/50866a82-d13a-4fd5-936f-deafdf1022ce/test_blob.txt
return re.search(r'https://(.*?).blob.core.windows.net/(.*?)/(.*?)$', blob_url).groups()


def get_blob_url(account_name: str, container_name: str, blob_name='') -> str:
return f'{get_account_url(account_name)}{container_name}/{blob_name}'
1 change: 1 addition & 0 deletions airlock_processor/shared_code/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@

# Event Grid
STEP_RESULT_EVENT_DATA_VERSION = "1.0"
DATA_DELETION_EVENT_DATA_VERSION = "1.0"

NO_THREATS = "No threats found"
17 changes: 16 additions & 1 deletion airlock_processor/tests/shared_code/test_blob_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from unittest import TestCase
from unittest.mock import MagicMock, patch

from shared_code.blob_operations import get_blob_info_from_topic_and_subject, get_blob_info_from_blob_url, copy_data
from shared_code.blob_operations import get_blob_info_from_topic_and_subject, get_blob_info_from_blob_url, copy_data, get_blob_url
from exceptions import TooManyFilesInRequestException, NoFilesInRequestException


Expand Down Expand Up @@ -75,3 +75,18 @@ def test_copy_data_adds_copied_from_metadata(self, _, mock_blob_service_client):

# Check that copied_from field was set correctly in the metadata
dest_blob_client_mock.start_copy_from_url.assert_called_with(f"{source_url}?sas", metadata=dest_metadata)

def test_get_blob_url_should_return_blob_url(self):
account_name = "account"
container_name = "container"
blob_name = "blob"

blob_url = get_blob_url(account_name, container_name, blob_name)
self.assertEqual(blob_url, f"https://{account_name}.blob.core.windows.net/{container_name}/{blob_name}")

def test_get_blob_url_without_blob_name_should_return_container_url(self):
account_name = "account"
container_name = "container"

blob_url = get_blob_url(account_name, container_name)
self.assertEqual(blob_url, f"https://{account_name}.blob.core.windows.net/{container_name}/")
Loading