Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airlock processor handles request Submission #1978

Merged
merged 12 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@
"mikestead.dotenv",
"humao.rest-client",
"timonwong.shellcheck",
"ms-azuretools.vscode-bicep"
"ms-azuretools.vscode-bicep",
"ms-azuretools.vscode-azurefunctions"
],
"forwardPorts": [
8000
Expand Down
1 change: 1 addition & 0 deletions airlock_processor/.funcignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.venv
35 changes: 35 additions & 0 deletions airlock_processor/BlobCreatedTrigger/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging

import azure.functions as func
import datetime
import uuid
import json
import re


def main(msg: func.ServiceBusMessage,
outputEvent: func.Out[func.EventGridOutputEvent]):

logging.info("Python ServiceBus topic trigger processed message - A new blob was created!.")
body = msg.get_body().decode('utf-8')
logging.info('Python ServiceBus queue trigger processed message: %s', body)

json_body = json.loads(body)
# message is due to blob creation in an 'in-progress' blob
if "stalimip" in json_body["topic"]:
completed_step = "submitted"
new_status = "in-progress"
request_id = re.search(r'/blobServices/default/containers/(.*?)/blobs', json_body["subject"]).group(1)
damoodamoo marked this conversation as resolved.
Show resolved Hide resolved

# Todo delete old container here
# https://github.com/microsoft/AzureTRE/issues/1963

# reply with a step completed event
outputEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"completed_step": completed_step, "new_status": new_status, "request_id": request_id},
subject=request_id,
event_type="Airlock.StepResult",
event_time=datetime.datetime.utcnow(),
data_version="1.0"))
21 changes: 21 additions & 0 deletions airlock_processor/BlobCreatedTrigger/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"scriptFile": "__init__.py",
"entryPoint": "main",
"bindings": [
{
"name": "msg",
"type": "serviceBusTrigger",
eladiw marked this conversation as resolved.
Show resolved Hide resolved
"direction": "in",
"topicName": "%BLOB_CREATED_TOPIC_NAME%",
"subscriptionName": "%TOPIC_SUBSCRIPTION_NAME%",
"connection": "SB_CONNECTION_STRING"
},
{
"type": "eventGrid",
"name": "outputEvent",
"topicEndpointUri": "EVENT_GRID_TOPIC_URI_SETTING",
"topicKeySetting": "EVENT_GRID_TOPIC_KEY_SETTING",
"direction": "out"
}
]
}
147 changes: 135 additions & 12 deletions airlock_processor/StatusChangedQueueTrigger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,141 @@
import logging

import azure.functions as func
import datetime
import os
import json
from shared_code import blob_operations, constants
from azure.identity import DefaultAzureCredential
from azure.mgmt.storage import StorageManagementClient
from pydantic import BaseModel, parse_obj_as


def main(msg: func.ServiceBusMessage,
outputEvent: func.Out[func.EventGridOutputEvent]):
class RequestProperties(BaseModel):
request_id: str
status: str
type: str
workspace_id: str

logging.info('Python ServiceBus queue trigger processed message: %s', msg.get_body().decode('utf-8'))
outputEvent.set(
func.EventGridOutputEvent(
id="step-result-id",
data={"tag1": "value1", "tag2": "value2"},
subject="test-subject",
event_type="test-event-1",
event_time=datetime.datetime.utcnow(),
data_version="1.0"))

class ContainersCopyMetadata:
source_account_name: str
source_account_key: str
sa_source_connection_string: str
sa_dest_connection_string: str

def __init__(self, source_account_name: str, source_account_key: str, sa_source_connection_string: str, sa_dest_connection_string: str):
self.source_account_name = source_account_name
self.source_account_key = source_account_key
self.sa_source_connection_string = sa_source_connection_string
self.sa_dest_connection_string = sa_dest_connection_string


def main(msg: func.ServiceBusMessage):

body = msg.get_body().decode('utf-8')
logging.info('Python ServiceBus queue trigger processed message: %s', body)

try:
request_properties = extract_properties(body)

new_status = request_properties.status
req_id = request_properties.request_id
ws_id = request_properties.workspace_id
request_type = request_properties.type
except Exception as e:
logging.error(f'Failed processing request - invalid message: {body}, exc: {e}')
raise

logging.info('Processing request with id %s. new status is "%s", type is "%s"', req_id, new_status, type)

if (is_require_data_copy(new_status)):
logging.info('Request with id %s. requires data copy between storage accounts', req_id)
containers_metadata = get_source_dest_env_vars(new_status, request_type, ws_id)
blob_operations.copy_data(containers_metadata.source_account_name, containers_metadata.source_account_key, containers_metadata.sa_source_connection_string, containers_metadata.sa_dest_connection_string, req_id)
return

# Todo: handle other cases...


def extract_properties(body: str) -> RequestProperties:
try:
json_body = json.loads(body)
result = parse_obj_as(RequestProperties, json_body["data"])
if not result:
raise Exception("Failed parsing request properties")
except json.decoder.JSONDecodeError:
logging.error(f'Error decoding object: {body}')
raise
except Exception as e:
logging.error(f'Error extracting properties: {e}')
raise

return result


def is_require_data_copy(new_status: str):
if new_status.lower() in [constants.STAGE_SUBMITTED, constants.STAGE_APPROVED, constants.STAGE_REJECTED, constants.STAGE_BLOCKED]:
return True
return False


def get_source_dest_env_vars(new_status: str, request_type: str, short_workspace_id: str) -> ContainersCopyMetadata:
eladiw marked this conversation as resolved.
Show resolved Hide resolved

# sanity
if is_require_data_copy(new_status) is False:
raise Exception("Given new status is not supported")

try:
tre_id = os.environ["TRE_ID"]
subscription_id = os.environ["AZURE_SUBSCRIPTION_ID"]
tamirkamara marked this conversation as resolved.
Show resolved Hide resolved
except KeyError as e:
logging.error(f'Missing environment variable: {e}')
raise

request_type = request_type.lower()
if request_type != "import" and request_type != "export":
raise Exception("Request type must be either import or export")

if new_status == 'submitted' and request_type == 'import':
source_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL.format(tre_id)
dest_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS.format(tre_id)
source_account_rg = constants.CORE_RG_NAME.format(tre_id)
dest_account_rg = source_account_rg
logging.info("source account [%s rg: %s]. dest account [%s rg: %s]", source_account_name, source_account_rg, dest_account_name, dest_account_rg)
elif new_status == 'submitted' and request_type == 'export':
source_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL.format(short_workspace_id)
dest_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS.format(short_workspace_id)
source_account_rg = constants.CORE_RG_NAME.format(tre_id, short_workspace_id)
dest_account_rg = source_account_rg
logging.info("source account [%s rg: %s]. dest account [%s rg: %s]", source_account_name, source_account_rg, dest_account_name, dest_account_rg)
elif new_status == 'approved' and request_type == 'import':
# https://github.com/microsoft/AzureTRE/issues/1841
pass
elif new_status == 'approved' and request_type == 'export':
# https://github.com/microsoft/AzureTRE/issues/1841
pass
elif new_status == 'rejected' and request_type == 'import':
# https://github.com/microsoft/AzureTRE/issues/1842
pass
elif new_status == 'rejected' and request_type == 'export':
# https://github.com/microsoft/AzureTRE/issues/1842
pass

managed_identity = os.environ.get("MANAGED_IDENTITY_CLIENT_ID")
if managed_identity:
logging.info("using the Airlock processor's managed identity to get build storage management client")
credential = DefaultAzureCredential(managed_identity_client_id=os.environ["MANAGED_IDENTITY_CLIENT_ID"], exclude_shared_token_cache_credential=True) if managed_identity else DefaultAzureCredential()

storage_client = StorageManagementClient(credential, subscription_id)
source_storage_keys = storage_client.storage_accounts.list_keys(source_account_rg, source_account_name)
source_storage_keys = {v.key_name: v.value for v in source_storage_keys.keys}

dest_storage_keys = storage_client.storage_accounts.list_keys(dest_account_rg, dest_account_name)
dest_storage_keys = {v.key_name: v.value for v in dest_storage_keys.keys}

conn_string_base = "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName={};AccountKey={}"
source_account_key = source_storage_keys['key1']
sa_source_connection_string = conn_string_base.format(source_account_name, source_account_key)
dest_account_key = dest_storage_keys['key1']
sa_dest_connection_string = conn_string_base.format(dest_account_name, dest_account_key)

return ContainersCopyMetadata(source_account_name, source_account_key, sa_source_connection_string, sa_dest_connection_string)
7 changes: 0 additions & 7 deletions airlock_processor/StatusChangedQueueTrigger/function.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,6 @@
"direction": "in",
"queueName": "%AIRLOCK_STATUS_CHANGED_QUEUE_NAME%",
"connection": "SB_CONNECTION_STRING"
},
{
"type": "eventGrid",
"name": "outputEvent",
"topicEndpointUri": "EVENT_GRID_TOPIC_URI_SETTING",
"topicKeySetting": "EVENT_GRID_TOPIC_KEY_SETTING",
"direction": "out"
}
]
}
2 changes: 1 addition & 1 deletion airlock_processor/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0.2"
__version__ = "0.0.3"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class AirlockInvalidContainerException(Exception):
pass
12 changes: 12 additions & 0 deletions airlock_processor/local.settings.json-sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"IsEncrypted": false,
"Values": {
"FUNCTIONS_WORKER_RUNTIME": "python",
"AIRLOCK_STATUS_CHANGED_QUEUE_NAME": "status_changed",
"SB_CONNECTION_STRING": "Endpoint=sb://XXXX.servicebus.windows.net/;SharedAccessKeyName=.....",
"AZURE_SUBSCRIPTION_ID": "",
"BLOB_CREATED_TOPIC_NAME": "",
"TOPIC_SUBSCRIPTION_NAME":"",
"TRE_ID": ""
}
}
7 changes: 6 additions & 1 deletion airlock_processor/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Do not include azure-functions-worker as it may conflict with the Azure Functions platform

azure-functions
azure-functions
azure-storage-blob
azure-identity
azure-mgmt-storage
azure-mgmt-resource
pydantic
55 changes: 55 additions & 0 deletions airlock_processor/shared_code/blob_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import logging

import datetime
from azure.storage.blob import ContainerSasPermissions, generate_container_sas, BlobServiceClient

from exceptions.AirlockInvalidContainerException import AirlockInvalidContainerException


def copy_data(source_account_name: str, source_account_key: str, sa_source_connection_string: str, sa_dest_connection_string: str, request_id: str):
container_name = request_id

# token geneation with expiry of 1 hour. since its not shared, we can leave it to expire (no need to track/delete)
# Remove sas token if not needed: https://github.com/microsoft/AzureTRE/issues/2034
sas_token = generate_container_sas(account_name=source_account_name,
tamirkamara marked this conversation as resolved.
Show resolved Hide resolved
container_name=container_name,
account_key=source_account_key,
permission=ContainerSasPermissions(read=True),
expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=1))

# Copy files
source_blob_service_client = BlobServiceClient.from_connection_string(sa_source_connection_string)
dest_blob_service_client = BlobServiceClient.from_connection_string(sa_dest_connection_string)

source_container_client = source_blob_service_client.get_container_client(container_name)

try:
found_blobs = 0
blob_name = ""
for blob in source_container_client.list_blobs():
if found_blobs > 0:
msg = "Request with id {} contains more than 1 file. flow aborted.".format(request_id)
logging.error(msg)
raise AirlockInvalidContainerException(msg)
blob_name = blob.name
found_blobs += 1

if found_blobs == 0:
logging.info('Request with id %s did not contain any files. flow aborted.', request_id)

except Exception:
logging.error('Request with id %s failed.', request_id)
raise()

source_blob = source_container_client.get_blob_client(blob_name)

source_url = f'{source_blob.url}?{sas_token}'
# source_url = source_blob.url

copied_blob = dest_blob_service_client.get_blob_client(container_name, source_blob.blob_name)
copy = copied_blob.start_copy_from_url(source_url)

try:
logging.info("Copy operation returned 'copy_id': '%s', 'copy_status': '%s'", copy["copy_id"], copy["copy_status"])
except KeyError as e:
logging.error(f"Failed getting operation id and status {e}")
17 changes: 17 additions & 0 deletions airlock_processor/shared_code/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# RG
CORE_RG_NAME = "rg-{}"
WS_RG_NAME = "rg-{}-ws-{}"

# Import
STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL = "stalimex{}"
STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS = "stalimip{}"

# Export
STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL = "stalexintws{}"
STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS = "stalexipws{}"

# Stages
STAGE_SUBMITTED = "submitted"
STAGE_APPROVED = "approved"
STAGE_REJECTED = "rejected"
STAGE_BLOCKED = "blocked"
eladiw marked this conversation as resolved.
Show resolved Hide resolved
Empty file.
51 changes: 51 additions & 0 deletions airlock_processor/tests/test_copy_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from json import JSONDecodeError
import unittest

from StatusChangedQueueTrigger import extract_properties, get_source_dest_env_vars, is_require_data_copy


class TestPropertiesExtraction(unittest.TestCase):
def test_extract_prop_valid_body_return_all_values(self):
msg = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
req_prop = extract_properties(msg)
self.assertEqual(req_prop.request_id, "123")
self.assertEqual(req_prop.status, "456")
self.assertEqual(req_prop.type, "789")
self.assertEqual(req_prop.workspace_id, "ws1")

def test_extract_prop_missing_arg_throws(self):
msg = "{ \"data\": { \"status\":\"456\" , \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
self.assertRaises(Exception, extract_properties, msg)

msg = "{ \"data\": { \"request_id\":\"123\", \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
self.assertRaises(Exception, extract_properties, msg)

msg = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"workspace_id\":\"ws1\" }}"
self.assertRaises(Exception, extract_properties, msg)

msg = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"type\":\"789\" }}"
self.assertRaises(Exception, extract_properties, msg)

def test_extract_prop_invalid_json_throws(self):
msg = "Hi"
self.assertRaises(JSONDecodeError, extract_properties, msg)


class TestDataCopyProperties(unittest.TestCase):
def test_only_specific_status_are_triggering_copy(self):
self.assertEqual(is_require_data_copy("Mitzi"), False)
self.assertEqual(is_require_data_copy(""), False)
self.assertEqual(is_require_data_copy("submit"), False)

# Testing all values that should return true
self.assertEqual(is_require_data_copy("submITted"), True)
self.assertEqual(is_require_data_copy("submitted"), True)
self.assertEqual(is_require_data_copy("approved"), True)
self.assertEqual(is_require_data_copy("REJected"), True)
self.assertEqual(is_require_data_copy("blocked"), True)

def test_wrong_status_raises_when_getting_storage_account_properties(self):
self.assertRaises(Exception, get_source_dest_env_vars, "Miaow", "import")

def test_wrong_type_raises_when_getting_storage_account_properties(self):
self.assertRaises(Exception, get_source_dest_env_vars, "accepted", "somethingelse")
Loading