Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
# Licensed under the MIT License.

import json
import os
from typing import Union

from azure.storage.blob import BlobClient as BlobClientSdk
from azurefunctions.extensions.base import Datum, SdkType
from .utils import ConnectionConfig


class BlobClient(SdkType):
def __init__(self, *, data: Union[bytes, Datum]) -> None:

# model_binding_data properties
self._data = data
self._version = None
Expand All @@ -25,7 +24,9 @@ def __init__(self, *, data: Union[bytes, Datum]) -> None:
self._source = data.source
self._content_type = data.content_type
content_json = json.loads(data.content)
self._connection = os.getenv(content_json["Connection"])
self._connection = ConnectionConfig(
connection_string=content_json["Connection"]
)
self._containerName = content_json["ContainerName"]
self._blobName = content_json["BlobName"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class BlobClientConverter(
binding="blob",
trigger="blobTrigger",
):

@classmethod
def check_input_type_annotation(cls, pytype: type) -> bool:
return issubclass(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
# Licensed under the MIT License.

import json
import os
from typing import Union

from azure.storage.blob import ContainerClient as ContainerClientSdk
from azurefunctions.extensions.base import Datum, SdkType
from .utils import ConnectionConfig


class ContainerClient(SdkType):
def __init__(self, *, data: Union[bytes, Datum]) -> None:

# model_binding_data properties
self._data = data
self._version = ""
Expand All @@ -25,7 +24,9 @@ def __init__(self, *, data: Union[bytes, Datum]) -> None:
self._source = data.source
self._content_type = data.content_type
content_json = json.loads(data.content)
self._connection = os.getenv(content_json["Connection"])
self._connection = ConnectionConfig(
connection_string=content_json["Connection"]
)
self._containerName = content_json["ContainerName"]
self._blobName = content_json["BlobName"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
# Licensed under the MIT License.

import json
import os
from typing import Union

from azure.storage.blob import BlobClient as BlobClientSdk
from azurefunctions.extensions.base import Datum, SdkType
from .utils import ConnectionConfig


class StorageStreamDownloader(SdkType):
def __init__(self, *, data: Union[bytes, Datum]) -> None:

# model_binding_data properties
self._data = data or {}
self._version = ""
Expand All @@ -25,7 +24,9 @@ def __init__(self, *, data: Union[bytes, Datum]) -> None:
self._source = data.source
self._content_type = data.content_type
content_json = json.loads(data.content)
self._connection = os.getenv(content_json["Connection"])
self._connection = ConnectionConfig(
connection_string=content_json["Connection"]
)
self._containerName = content_json["ContainerName"]
self._blobName = content_json["BlobName"]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import os
from pydantic import BaseModel, Field, field_validator


class ConnectionConfig(BaseModel):
connection_string: str = Field(
..., description="Storage account connection string name"
)

@field_validator("connection_string")
@classmethod
def validate_connection_string(cls, cx_connection_string: str) -> str:
"""
Validates the connection string. If the connection string is
not an App Setting, an error will be thrown.
"""
if cx_connection_string is "":
raise ValueError(
"Storage account connection string cannot be empty. "
"Please provide a connection string."
)
elif str.isspace(cx_connection_string):
raise ValueError(
"Storage account connection string cannot contain only whitespace. "
"Please provide a valid connection string."
)
elif not os.getenv(cx_connection_string):
raise ValueError(
"Storage account connection string %s does not exist. "
"Please make sure that it is a defined App Setting.",
cx_connection_string,
)
return os.getenv(cx_connection_string)
3 changes: 2 additions & 1 deletion azurefunctions-extensions-bindings-blob/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ classifiers= [
]
dependencies = [
'azurefunctions-extensions-base',
'azure-storage-blob==12.20.0'
'azure-storage-blob==12.20.0',
'pydantic==2.8.0'
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import unittest
from enum import Enum
from pydantic import ValidationError
from typing import Optional

from azure.storage.blob import BlobClient as BlobClientSdk
Expand Down Expand Up @@ -126,6 +127,103 @@ def test_input_populated(self):
self.assertIsNotNone(sdk_result)
self.assertIsInstance(sdk_result, BlobClientSdk)

def test_invalid_input_populated(self):
content = {
"Connection": "NotARealConnectionString",
"ContainerName": "test-blob",
"BlobName": "text.txt",
}

sample_mbd = MockMBD(
version="1.0",
source="AzureStorageBlobs",
content_type="application/json",
content=json.dumps(content),
)

with self.assertRaises(ValidationError) as e:
datum: Datum = Datum(value=sample_mbd, type="model_binding_data")
result: BlobClient = BlobClientConverter.decode(
data=datum, trigger_metadata=None, pytype=BlobClient
)
self.assertRegex(
e.exception.__repr__(),
r".*Storage account connection string "
r"NotARealConnectionString does not exist. "
r".*Please make sure that it is a defined App Setting.*",
)

def test_none_input_populated(self):
content = {
"Connection": None,
"ContainerName": "test-blob",
"BlobName": "text.txt",
}

sample_mbd = MockMBD(
version="1.0",
source="AzureStorageBlobs",
content_type="application/json",
content=json.dumps(content),
)

with self.assertRaises(ValidationError) as e:
datum: Datum = Datum(value=sample_mbd, type="model_binding_data")
result: BlobClient = BlobClientConverter.decode(
data=datum, trigger_metadata=None, pytype=BlobClient
)
self.assertRegex(e.exception.__repr__(), r".*Input should be a valid string.*")

def test_empty_input_populated(self):
content = {
"Connection": "",
"ContainerName": "test-blob",
"BlobName": "text.txt",
}

sample_mbd = MockMBD(
version="1.0",
source="AzureStorageBlobs",
content_type="application/json",
content=json.dumps(content),
)

with self.assertRaises(ValidationError) as e:
datum: Datum = Datum(value=sample_mbd, type="model_binding_data")
result: BlobClient = BlobClientConverter.decode(
data=datum, trigger_metadata=None, pytype=BlobClient
)
self.assertRegex(
e.exception.__repr__(),
r".*Storage account connection string cannot be empty. "
r"Please provide a connection string.*",
)

def test_whitespace_input_populated(self):
content = {
"Connection": " /t/n",
"ContainerName": "test-blob",
"BlobName": "text.txt",
}

sample_mbd = MockMBD(
version="1.0",
source="AzureStorageBlobs",
content_type="application/json",
content=json.dumps(content),
)

with self.assertRaises(ValidationError) as e:
datum: Datum = Datum(value=sample_mbd, type="model_binding_data")
result: BlobClient = BlobClientConverter.decode(
data=datum, trigger_metadata=None, pytype=BlobClient
)
self.assertRegex(
e.exception.__repr__(),
r".*Storage account connection string cannot contain"
r" only whitespace. Please provide a valid connection string.*",
)

def test_input_invalid_pytype(self):
content = {
"Connection": "AzureWebJobsStorage",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import json
import unittest
from enum import Enum
from pydantic import ValidationError
from typing import Optional

from azure.storage.blob import ContainerClient as ContainerClientSdk
Expand Down Expand Up @@ -124,6 +125,103 @@ def test_input_populated(self):
self.assertIsNotNone(sdk_result)
self.assertIsInstance(sdk_result, ContainerClientSdk)

def test_invalid_input_populated(self):
content = {
"Connection": "NotARealConnectionString",
"ContainerName": "test-blob",
"BlobName": "text.txt",
}

sample_mbd = MockMBD(
version="1.0",
source="AzureStorageBlobs",
content_type="application/json",
content=json.dumps(content),
)

with self.assertRaises(ValidationError) as e:
datum: Datum = Datum(value=sample_mbd, type="model_binding_data")
result: ContainerClient = BlobClientConverter.decode(
data=datum, trigger_metadata=None, pytype=ContainerClient
)
self.assertRegex(
e.exception.__repr__(),
r".*Storage account connection string "
r"NotARealConnectionString does not exist. "
r".*Please make sure that it is a defined App Setting.*",
)

def test_none_input_populated(self):
content = {
"Connection": None,
"ContainerName": "test-blob",
"BlobName": "text.txt",
}

sample_mbd = MockMBD(
version="1.0",
source="AzureStorageBlobs",
content_type="application/json",
content=json.dumps(content),
)

with self.assertRaises(ValidationError) as e:
datum: Datum = Datum(value=sample_mbd, type="model_binding_data")
result: ContainerClient = BlobClientConverter.decode(
data=datum, trigger_metadata=None, pytype=ContainerClient
)
self.assertRegex(e.exception.__repr__(), r".*Input should be a valid string.*")

def test_empty_input_populated(self):
content = {
"Connection": "",
"ContainerName": "test-blob",
"BlobName": "text.txt",
}

sample_mbd = MockMBD(
version="1.0",
source="AzureStorageBlobs",
content_type="application/json",
content=json.dumps(content),
)

with self.assertRaises(ValidationError) as e:
datum: Datum = Datum(value=sample_mbd, type="model_binding_data")
result: ContainerClient = BlobClientConverter.decode(
data=datum, trigger_metadata=None, pytype=ContainerClient
)
self.assertRegex(
e.exception.__repr__(),
r".*Storage account connection string cannot be empty. "
r"Please provide a connection string.*",
)

def test_whitespace_input_populated(self):
content = {
"Connection": " /t/n",
"ContainerName": "test-blob",
"BlobName": "text.txt",
}

sample_mbd = MockMBD(
version="1.0",
source="AzureStorageBlobs",
content_type="application/json",
content=json.dumps(content),
)

with self.assertRaises(ValidationError) as e:
datum: Datum = Datum(value=sample_mbd, type="model_binding_data")
result: ContainerClient = BlobClientConverter.decode(
data=datum, trigger_metadata=None, pytype=ContainerClient
)
self.assertRegex(
e.exception.__repr__(),
r".*Storage account connection string cannot contain"
r" only whitespace. Please provide a valid connection string.*",
)

def test_input_invalid_pytype(self):
content = {
"Connection": "AzureWebJobsStorage",
Expand Down
Loading