Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHub] update arm template with storage conn str #20376

Merged
merged 9 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
import time
import logging
import calendar
import dateutil.parser
from azure.core import MatchConditions
from azure.eventhub import CheckpointStore # type: ignore # pylint: disable=no-name-in-module
from azure.eventhub.exceptions import OwnershipLostError # type: ignore
from azure.core.exceptions import (
ResourceModifiedError,
ResourceExistsError,
ResourceNotFoundError,
)
from ._vendor.data.tables import TableClient, UpdateMode
from ._vendor.data.tables._base_client import parse_connection_str
from ._vendor.data.tables._deserialize import clean_up_dotnet_timestamps
from ._vendor.data.tables._common_conversion import TZ_UTC

logger = logging.getLogger(__name__)

Expand All @@ -39,6 +39,19 @@ def _to_timestamp(date):
timestamp += date.microsecond / 1e6
return timestamp

def _to_datetime(value):
swathipil marked this conversation as resolved.
Show resolved Hide resolved
# Cosmos returns this with a decimal point that throws an error on deserialization
cleaned_value = clean_up_dotnet_timestamps(value)
try:
dt_obj = datetime.datetime.strptime(cleaned_value, "%Y-%m-%dT%H:%M:%S.%fZ").replace(
tzinfo=TZ_UTC
)
except ValueError:
dt_obj = datetime.datetime.strptime(cleaned_value, "%Y-%m-%dT%H:%M:%SZ").replace(
tzinfo=TZ_UTC
)
return dt_obj


class TableCheckpointStore(CheckpointStore):
"""A CheckpointStore that uses Azure Table Storage to store the partition ownership and checkpoint data.
Expand Down Expand Up @@ -113,13 +126,13 @@ def _create_ownership_entity(cls, ownership):
Create a dictionary with the `ownership` attributes.
"""
ownership_entity = {
"PartitionKey": "{} {} {} Ownership".format(
"PartitionKey": u"{} {} {} Ownership".format(
ownership["fully_qualified_namespace"],
ownership["eventhub_name"],
ownership["consumer_group"],
),
"RowKey": ownership["partition_id"],
"ownerid": ownership["owner_id"],
"RowKey": u"{}".format(ownership["partition_id"]),
"ownerid": u"{}".format(ownership["owner_id"]),
}
return ownership_entity

Expand All @@ -129,21 +142,21 @@ def _create_checkpoint_entity(cls, checkpoint):
Create a dictionary with `checkpoint` attributes.
"""
checkpoint_entity = {
"PartitionKey": "{} {} {} Checkpoint".format(
"PartitionKey": u"{} {} {} Checkpoint".format(
checkpoint["fully_qualified_namespace"],
checkpoint["eventhub_name"],
checkpoint["consumer_group"],
),
"RowKey": checkpoint["partition_id"],
"offset": checkpoint["offset"],
"sequencenumber": checkpoint["sequence_number"],
"RowKey": u"{}".format(checkpoint["partition_id"]),
"offset": u"{}".format(checkpoint["offset"]),
"sequencenumber": u"{}".format(checkpoint["sequence_number"]),
}
return checkpoint_entity

def _update_ownership(self, ownership, **kwargs):
"""_update_ownership mutates the passed in ownership."""
ownership_entity = TableCheckpointStore._create_ownership_entity(ownership)
try:
ownership_entity = TableCheckpointStore._create_ownership_entity(ownership)
metadata = self._table_client.update_entity(
mode=UpdateMode.REPLACE,
entity=ownership_entity,
Expand All @@ -165,16 +178,18 @@ def _update_ownership(self, ownership, **kwargs):
entity=ownership_entity, headers={"Prefer": "return-content"}, **kwargs
)
ownership["etag"] = metadata["etag"]
print('swathi')
print(metadata["content"]["Timestamp"])
swathipil marked this conversation as resolved.
Show resolved Hide resolved
swathipil marked this conversation as resolved.
Show resolved Hide resolved
ownership["last_modified_time"] = _to_timestamp(
dateutil.parser.isoparse(metadata["content"]["Timestamp"])
_to_datetime(metadata["content"]["Timestamp"])
)

def _claim_one_partition(self, ownership, **kwargs):
new_ownership = ownership.copy()
try:
self._update_ownership(new_ownership, **kwargs)
return new_ownership
except (ResourceModifiedError, ResourceExistsError):
swathipil marked this conversation as resolved.
Show resolved Hide resolved
except ResourceExistsError:
logger.info(
"EventProcessor instance %r of namespace %r eventhub %r consumer group %r "
"lost ownership to partition %r",
Expand All @@ -186,6 +201,7 @@ def _claim_one_partition(self, ownership, **kwargs):
)
raise OwnershipLostError()
except Exception as error: # pylint:disable=broad-except
# includes ResourceModifiedError (no matching `etag`)
swathipil marked this conversation as resolved.
Show resolved Hide resolved
logger.warning(
"An exception occurred when EventProcessor instance %r claim_ownership for "
"namespace %r eventhub %r consumer group %r partition %r. "
Expand Down Expand Up @@ -289,7 +305,7 @@ def list_checkpoints(
"eventhub_name": eventhub_name,
"consumer_group": consumer_group,
"partition_id": entity[u"RowKey"],
"sequence_number": entity[u"sequencenumber"],
"sequence_number": int(entity[u"sequencenumber"]),
"offset": str(entity[u"offset"]),
}
checkpoints_list.append(checkpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
from azure.eventhub.extensions.checkpointstoretable import TableCheckpointStore
from azure.eventhub.exceptions import OwnershipLostError

STORAGE_CONN_STR = [
#os.environ.get("AZURE_STORAGE_CONN_STR", "Azure Storage Connection String"),
os.environ.get("AZURE_COSMOS_CONN_STR", "Azure Storage Connection String"),
STORAGE_ENV_KEYS = [
"AZURE_TABLES_CONN_STR",
"AZURE_COSMOS_CONN_STR"
]


def get_live_storage_table_client(storage_connection_str):
def get_live_storage_table_client(conn_str_env_key):
yunhaoling marked this conversation as resolved.
Show resolved Hide resolved
try:
storage_connection_str = os.environ[conn_str_env_key]
table_name = "table{}".format(uuid.uuid4().hex)
table_service_client = TableServiceClient.from_connection_string(
storage_connection_str
Expand Down Expand Up @@ -94,10 +95,12 @@ def _claim_ownership_exception_test(storage_connection_str, table_name):
)
assert result_ownership[0] in list_ownership

single_ownership = [result_ownership_list[0].copy()]
single_ownership[0]["etag"] = "W/\"datetime'2021-08-02T00%3A46%3A51.7645424Z'\""
with pytest.raises(OwnershipLostError) as e_info:
checkpoint_store.claim_ownership(single_ownership)
single_ownership_list = [result_ownership_list[0].copy()]
single_ownership_list[0]["etag"] = "W/\"datetime'2021-08-02T00%3A46%3A51.7645424Z'\""

# `etag` does not match last updated `etag`, so original ownership will be returned
returned_ownership_list = checkpoint_store.claim_ownership(single_ownership_list)
assert returned_ownership_list == single_ownership_list


def _claim_and_list_ownership(storage_connection_str, table_name):
Expand Down Expand Up @@ -176,35 +179,35 @@ def _update_and_list_checkpoint(storage_connection_str, table_name):
assert checkpoint_list[0]["offset"] == "30"


@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR)
@pytest.mark.skip("update after adding conn str env var")
def test_claim_ownership_exception(storage_connection_str):
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
@pytest.mark.liveTest
def test_claim_ownership_exception(conn_str_env_key):
storage_connection_str, table_name = get_live_storage_table_client(
storage_connection_str
conn_str_env_key
)
try:
_claim_ownership_exception_test(storage_connection_str, table_name)
finally:
remove_live_storage_table_client(storage_connection_str, table_name)


@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR)
@pytest.mark.skip("update after adding conn str env var")
def test_claim_and_list_ownership(storage_connection_str):
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
@pytest.mark.liveTest
def test_claim_and_list_ownership(conn_str_env_key):
storage_connection_str, table_name = get_live_storage_table_client(
storage_connection_str
conn_str_env_key
)
try:
_claim_and_list_ownership(storage_connection_str, table_name)
finally:
remove_live_storage_table_client(storage_connection_str, table_name)


@pytest.mark.parametrize("storage_connection_str", STORAGE_CONN_STR)
@pytest.mark.skip("update after adding conn str env var")
def test_update_checkpoint(storage_connection_str):
@pytest.mark.parametrize("conn_str_env_key", STORAGE_ENV_KEYS)
@pytest.mark.liveTest
def test_update_checkpoint(conn_str_env_key):
storage_connection_str, table_name = get_live_storage_table_client(
storage_connection_str
conn_str_env_key
)
try:
_update_and_list_checkpoint(storage_connection_str, table_name)
Expand Down
64 changes: 63 additions & 1 deletion sdk/eventhub/test-resources.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,14 @@
"eventHubsNamespace": "[concat('eh-', parameters('baseName'))]",
"eventHubName": "[concat('eh-', parameters('baseName'), '-hub')]",
"eventHubAuthRuleName": "[concat('eh-', parameters('baseName'), '-hub-auth-rule')]",
"storageAccount": "[concat('blb', parameters('baseName'))]",
"storageAccount": "[concat('storage', parameters('baseName'))]",
"containerName": "your-blob-container-name",
"defaultSASKeyName": "RootManageSharedAccessKey",
"eventHubsAuthRuleResourceId": "[resourceId('Microsoft.EventHub/namespaces/authorizationRules', variables('eventHubsNamespace'), variables('defaultSASKeyName'))]",
"storageAccountId": "[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccount'))]",
"tablesMgmtApiVersion": "2019-04-01",
"tablesAuthorizationApiVersion": "2018-09-01-preview",
"tableDataContributorRoleId": "0a9a7e1f-b9d0-4cc4-a60d-0319b160aaa3"
},
"resources": [
{
Expand Down Expand Up @@ -140,6 +143,48 @@
}
]
},
{
"type": "Microsoft.DocumentDB/databaseAccounts",
"apiVersion": "2020-04-01",
"name": "[variables('storageAccount')]",
"location": "[parameters('location')]",
"tags": {
"defaultExperience": "Azure Table",
"hidden-cosmos-mmspecial": "",
"CosmosAccountType": "Non-Production"
},
"kind": "GlobalDocumentDB",
"properties": {
"publicNetworkAccess": "Enabled",
"enableAutomaticFailover": false,
"enableMultipleWriteLocations": false,
"isVirtualNetworkFilterEnabled": false,
"virtualNetworkRules": [],
"disableKeyBasedMetadataWriteAccess": false,
"enableFreeTier": false,
"enableAnalyticalStorage": false,
"databaseAccountOfferType": "Standard",
"consistencyPolicy": {
"defaultConsistencyLevel": "BoundedStaleness",
"maxIntervalInSeconds": 86400,
"maxStalenessPrefix": 1000000
},
"locations": [
{
"locationName": "[parameters('location')]",
"provisioningState": "Succeeded",
"failoverPriority": 0,
"isZoneRedundant": false
}
],
"capabilities": [
{
"name": "EnableTable"
}
],
"ipRules": []
}
},
{
"type": "Microsoft.Authorization/roleAssignments",
"apiVersion": "2019-04-01-preview",
Expand All @@ -159,6 +204,15 @@
"principalId": "[parameters('testApplicationOid')]",
"scope": "[resourceGroup().id]"
}
},
{
"type": "Microsoft.Authorization/roleAssignments",
"apiVersion": "[variables('tablesAuthorizationApiVersion')]",
"name": "[guid(concat('tableDataContributorRoleId', resourceGroup().id))]",
"properties": {
"roleDefinitionId": "[resourceId('Microsoft.Authorization/roleDefinitions', variables('tableDataContributorRoleId'))]",
"principalId": "[parameters('testApplicationOid')]"
}
}
],
"outputs": {
Expand Down Expand Up @@ -197,6 +251,14 @@
"AZURE_STORAGE_ACCESS_KEY":{
"type": "string",
"value": "[listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value]"
},
"AZURE_TABLES_CONN_STR": {
"type": "string",
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(variables('storageAccountId'), providers('Microsoft.Storage', 'storageAccounts').apiVersions[0]).keys[0].value, ';EndpointSuffix=', parameters('storageEndpointSuffix'))]"
},
"AZURE_COSMOS_CONN_STR": {
"type": "string",
"value": "[concat('DefaultEndpointsProtocol=https;AccountName=', variables('storageAccount'), ';AccountKey=', listKeys(resourceId('Microsoft.DocumentDB/databaseAccounts', variables('storageAccount')), '2020-04-01').primaryMasterKey, ';TableEndpoint=https://', variables('storageAccount'), '.table.cosmos.azure.com:443/')]"
}
}
}