Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHub] update arm template with storage conn str #20376

Merged
merged 9 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import time
import logging
import calendar
import dateutil.parser
from azure.core import MatchConditions
from azure.eventhub import CheckpointStore # type: ignore # pylint: disable=no-name-in-module
from azure.eventhub.exceptions import OwnershipLostError # type: ignore
Expand All @@ -18,6 +17,8 @@
)
from ._vendor.data.tables import TableClient, UpdateMode
from ._vendor.data.tables._base_client import parse_connection_str
from ._vendor.data.tables._deserialize import clean_up_dotnet_timestamps
from ._vendor.data.tables._common_conversion import TZ_UTC

logger = logging.getLogger(__name__)

Expand All @@ -39,6 +40,19 @@ def _to_timestamp(date):
timestamp += date.microsecond / 1e6
return timestamp

def _timestamp_to_datetime(value):
# Cosmos returns this with a decimal point that throws an error on deserialization
cleaned_value = clean_up_dotnet_timestamps(value)
try:
dt_obj = datetime.datetime.strptime(cleaned_value, "%Y-%m-%dT%H:%M:%S.%fZ").replace(
tzinfo=TZ_UTC
)
except ValueError:
dt_obj = datetime.datetime.strptime(cleaned_value, "%Y-%m-%dT%H:%M:%SZ").replace(
tzinfo=TZ_UTC
)
return dt_obj


class TableCheckpointStore(CheckpointStore):
"""A CheckpointStore that uses Azure Table Storage to store the partition ownership and checkpoint data.
Expand Down Expand Up @@ -113,13 +127,13 @@ def _create_ownership_entity(cls, ownership):
Create a dictionary with the `ownership` attributes.
"""
ownership_entity = {
"PartitionKey": "{} {} {} Ownership".format(
"PartitionKey": u"{} {} {} Ownership".format(
ownership["fully_qualified_namespace"],
ownership["eventhub_name"],
ownership["consumer_group"],
),
"RowKey": ownership["partition_id"],
"ownerid": ownership["owner_id"],
"RowKey": u"{}".format(ownership["partition_id"]),
"ownerid": u"{}".format(ownership["owner_id"]),
}
return ownership_entity

Expand All @@ -129,21 +143,21 @@ def _create_checkpoint_entity(cls, checkpoint):
Create a dictionary with `checkpoint` attributes.
"""
checkpoint_entity = {
"PartitionKey": "{} {} {} Checkpoint".format(
"PartitionKey": u"{} {} {} Checkpoint".format(
checkpoint["fully_qualified_namespace"],
checkpoint["eventhub_name"],
checkpoint["consumer_group"],
),
"RowKey": checkpoint["partition_id"],
"offset": checkpoint["offset"],
"sequencenumber": checkpoint["sequence_number"],
"RowKey": u"{}".format(checkpoint["partition_id"]),
"offset": u"{}".format(checkpoint["offset"]),
"sequencenumber": u"{}".format(checkpoint["sequence_number"]),
}
return checkpoint_entity

def _update_ownership(self, ownership, **kwargs):
"""_update_ownership mutates the passed in ownership."""
ownership_entity = TableCheckpointStore._create_ownership_entity(ownership)
try:
ownership_entity = TableCheckpointStore._create_ownership_entity(ownership)
metadata = self._table_client.update_entity(
mode=UpdateMode.REPLACE,
entity=ownership_entity,
Expand All @@ -166,7 +180,7 @@ def _update_ownership(self, ownership, **kwargs):
)
ownership["etag"] = metadata["etag"]
ownership["last_modified_time"] = _to_timestamp(
dateutil.parser.isoparse(metadata["content"]["Timestamp"])
_timestamp_to_datetime(metadata["content"]["Timestamp"])
)

def _claim_one_partition(self, ownership, **kwargs):
Expand Down Expand Up @@ -289,7 +303,7 @@ def list_checkpoints(
"eventhub_name": eventhub_name,
"consumer_group": consumer_group,
"partition_id": entity[u"RowKey"],
"sequence_number": entity[u"sequencenumber"],
"sequence_number": int(entity[u"sequencenumber"]),
"offset": str(entity[u"offset"]),
}
checkpoints_list.append(checkpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
from azure.eventhub.extensions.checkpointstoretable import TableCheckpointStore
from azure.eventhub.exceptions import OwnershipLostError

STORAGE_CONN_STR = [
#os.environ.get("AZURE_STORAGE_CONN_STR", "Azure Storage Connection String"),
os.environ.get("AZURE_COSMOS_CONN_STR", "Azure Storage Connection String"),
STORAGE_ENV_KEYS = [
"AZURE_TABLES_CONN_STR",
"AZURE_COSMOS_CONN_STR"
]


def get_live_storage_table_client(storage_connection_str):
def get_live_storage_table_client(conn_str_env_key):
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
try:
storage_connection_str = os.environ[conn_str_env_key]
table_name = "table{}".format(uuid.uuid4().hex)
table_service_client = TableServiceClient.from_connection_string(
storage_connection_str
Expand Down Expand Up @@ -176,35 +177,35 @@ def _update_and_list_checkpoint(storage_connection_str, table_name):
assert checkpoint_list[0]["offset"] == "30"


@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR)
@pytest.mark.skip("update after adding conn str env var")
def test_claim_ownership_exception(storage_connection_str):
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
@pytest.mark.liveTest
def test_claim_ownership_exception(conn_str_env_key):
storage_connection_str, table_name = get_live_storage_table_client(
storage_connection_str
conn_str_env_key
)
try:
_claim_ownership_exception_test(storage_connection_str, table_name)
finally:
remove_live_storage_table_client(storage_connection_str, table_name)


@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR)
@pytest.mark.skip("update after adding conn str env var")
def test_claim_and_list_ownership(storage_connection_str):
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
@pytest.mark.liveTest
def test_claim_and_list_ownership(conn_str_env_key):
storage_connection_str, table_name = get_live_storage_table_client(
storage_connection_str
conn_str_env_key
)
try:
_claim_and_list_ownership(storage_connection_str, table_name)
finally:
remove_live_storage_table_client(storage_connection_str, table_name)


@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR)
@pytest.mark.skip("update after adding conn str env var")
def test_update_checkpoint(storage_connection_str):
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
@pytest.mark.liveTest
def test_update_checkpoint(conn_str_env_key):
storage_connection_str, table_name = get_live_storage_table_client(
storage_connection_str
conn_str_env_key
)
try:
_update_and_list_checkpoint(storage_connection_str, table_name)
Expand Down
64 changes: 63 additions & 1 deletion sdk/eventhub/test-resources.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,14 @@
"eventHubsNamespace": "[concat('eh-', parameters('baseName'))]",
"eventHubName": "[concat('eh-', parameters('baseName'), '-hub')]",
"eventHubAuthRuleName": "[concat('eh-', parameters('baseName'), '-hub-auth-rule')]",
"storageAccount": "[concat('blb', parameters('baseName'))]",
"storageAccount": "[concat('storage', parameters('baseName'))]",
"containerName": "your-blob-container-name",
"defaultSASKeyName": "RootManageSharedAccessKey",
"eventHubsAuthRuleResourceId": "[resourceId('Microsoft.EventHub/namespaces/authorizationRules', variables('eventHubsNamespace'), variables('defaultSASKeyName'))]",
"storageAccountId": "[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccount'))]",
"tablesMgmtApiVersion": "2019-04-01",
"tablesAuthorizationApiVersion": "2018-09-01-preview",
"tableDataContributorRoleId": "0a9a7e1f-b9d0-4cc4-a60d-0319b160aaa3"
},
"resources": [
{
Expand Down Expand Up @@ -140,6 +143,48 @@
}
]
},
{
"type": "Microsoft.DocumentDB/databaseAccounts",
"apiVersion": "2020-04-01",
"name": "[variables('storageAccount')]",
"location": "[parameters('location')]",
"tags": {
"defaultExperience": "Azure Table",
"hidden-cosmos-mmspecial": "",
"CosmosAccountType": "Non-Production"
},
"kind": "GlobalDocumentDB",
"properties": {
"publicNetworkAccess": "Enabled",
"enableAutomaticFailover": false,
"enableMultipleWriteLocations": false,
"isVirtualNetworkFilterEnabled": false,
"virtualNetworkRules": [],
"disableKeyBasedMetadataWriteAccess": false,
"enableFreeTier": false,
"enableAnalyticalStorage": false,
"databaseAccountOfferType": "Standard",
"consistencyPolicy": {
"defaultConsistencyLevel": "BoundedStaleness",
"maxIntervalInSeconds": 86400,
"maxStalenessPrefix": 1000000
},
"locations": [
{
"locationName": "[parameters('location')]",
"provisioningState": "Succeeded",
"failoverPriority": 0,
"isZoneRedundant": false
}
],
"capabilities": [
{
"name": "EnableTable"
}
],
"ipRules": []
}
},
{
"type": "Microsoft.Authorization/roleAssignments",
"apiVersion": "2019-04-01-preview",
Expand All @@ -159,6 +204,15 @@
"principalId": "[parameters('testApplicationOid')]",
"scope": "[resourceGroup().id]"
}
},
{
"type": "Microsoft.Authorization/roleAssignments",
"apiVersion": "[variables('tablesAuthorizationApiVersion')]",
"name": "[guid(concat('tableDataContributorRoleId', resourceGroup().id))]",
"properties": {
"roleDefinitionId": "[resourceId('Microsoft.Authorization/roleDefinitions', variables('tableDataContributorRoleId'))]",
"principalId": "[parameters('testApplicationOid')]"
}
}
],
"outputs": {
Expand Down Expand Up @@ -197,6 +251,14 @@
"AZURE_STORAGE_ACCESS_KEY":{
"type": "string",
"value": "[listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value]"
},
"AZURE_TABLES_CONN_STR": {
"type": "string",
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=', parameters('storageEndpointSuffix'))]"
},
"AZURE_COSMOS_CONN_STR": {
"type": "string",
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(resourceId('Microsoft.DocumentDB/databaseAccounts', variables('storageAccount')), '2020-04-01').primaryMasterKey, ';TableEndpoint=https://', variables('storageAccount'), '.table.cosmos.azure.com:443/')]"
}
}
}