Skip to content

Commit

Permalink
Source Azure Table Storage: CDK Update (#34576)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristoGrab authored Jan 31, 2024
1 parent 1aaf9dc commit 78a6047
Show file tree
Hide file tree
Showing 13 changed files with 200 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_azure_table ./source_azure_table
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-azure-table
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-azure-table:dev
tests:
acceptance_tests:
spec:
- spec_path: "source_azure_table/spec.json"
tests:
- spec_path: "source_azure_table/spec.json"
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
validate_schema: False
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: []
validate_schema: False
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state:
future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
ignored_fields:
"AirbyteTest": ["record"]
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
{
"Test": {
"PartitionKey": "abcd"
[
{
"type": "STREAM",
"stream": {
"stream_state": { "PartitionKey": "999" },
"stream_descriptor": { "name": "pokemon" }
}
}
}
]

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@
"streams": [
{
"stream": {
"name": "Test",
"json_schema": {
"properties": {
"PartitionKey": {
"type": "string"
}
}
},
"name": "pokemon",
"json_schema": {},
"source_defined_cursor": true,
"supported_sync_modes": ["full_refresh", "incremental"]
},
"source_defined_cursor": true,
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["PartitionKey"]
},
{
"stream": {
"name": "campaigns",
"json_schema": {},
"source_defined_cursor": true,
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 798ae795-5189-42b6-b64e-3cb91db93338
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
dockerRepository: airbyte/source-azure-table
githubIssueLabel: source-azure-table
icon: azureblobstorage.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def get_typed_schema(self) -> object:
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
"properties": {"PartitionKey": {"type": "string"}},
}

Expand All @@ -50,7 +51,7 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn
next(tables_iterator)
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except StopIteration:
logger.log("No tables found, but credentials are correct.")
logger.info("The credentials you provided are valid, but no tables were found in the Storage Account.")
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {str(e)}")
Expand All @@ -70,8 +71,7 @@ def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteC
default_cursor_field=["PartitionKey"],
)
streams.append(stream)
logger.info(f"Total {streams.count} streams found.")

logger.info(f"Total {len(streams)} streams found.")
return AirbyteCatalog(streams=streams)

def streams(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> List[Stream]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def name(self):
return self.stream_name

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]):
return {self.cursor_field[0]: latest_record.record.data.get(self.cursor_field[0])}
return {self.cursor_field[0]: latest_record.data.get(self.cursor_field[0])}

def _update_state(self, latest_cursor):
self._state = latest_cursor
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
from unittest import mock

import pytest
from source_azure_table.azure_table import AzureTableReader
from source_azure_table.source import SourceAzureTable


# Fixtures
@pytest.fixture
def config():
return {"storage_account_name": "dummy-value", "storage_access_key": "dummy-value", "storage_endpoint_suffix": "dummy-value"}


@pytest.fixture
def tables():
table1 = mock.Mock()
table1.name = "AzureTable1"
table2 = mock.Mock()
table2.name = "AzureTable2"

tables = mock.MagicMock()
tables.__iter__.return_value = [table1, table2]
return tables


@pytest.fixture
def source():
return SourceAzureTable()


@pytest.fixture
def logger():
return logging.getLogger("airbyte")


@pytest.fixture
def reader(config, logger):
return AzureTableReader(logger, config)
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import pytest


def test_get_table_service_client_return(mocker, reader):
"""
Test that the get_table_service_client method returns the expected Table Service Client.
"""
mock_client = "dummy-client"
mocker.patch(
"source_azure_table.azure_table.TableServiceClient.from_connection_string",
return_value=mock_client,
)

client = reader.get_table_service_client()
assert client == mock_client


def test_get_table_service_client_handles_exception(mocker, reader):
"""
Test that get_table_service_client method handles exceptions correctly.
"""
mocker.patch(
"source_azure_table.azure_table.TableServiceClient.from_connection_string",
side_effect=Exception("Connection error")
)

with pytest.raises(Exception) as exc_info:
reader.get_table_service_client()

assert "Connection error" in str(exc_info.value)


def test_get_table_client_return(mocker, reader):
"""
Test that the get_table_client method returns the expected Table Client.
"""
mock_client = "dummy-client"
mocker.patch(
"source_azure_table.azure_table.TableClient.from_connection_string",
return_value=mock_client,
)

table = reader.get_table_client("dummy-table")
assert table == mock_client


def test_get_table_client_handles_exception(mocker, reader):
"""
Test that get_table_client method handles exceptions correctly.
"""

# The method throws its own exception for empty table names
with pytest.raises(Exception) as exc_info:
reader.get_table_client("")
assert "table name is not valid." in str(exc_info.value)

mocker.patch(
"source_azure_table.azure_table.TableClient.from_connection_string",
side_effect=Exception("Connection error")
)

with pytest.raises(Exception) as exc_info:
reader.get_table_client("valid_table_name")
assert "Connection error" in str(exc_info.value)


def test_get_tables_return(mocker, reader, tables):
"""
Test that the get_tables method returns the expected tables.
"""
mock_client = mocker.MagicMock()
mock_client.list_tables.return_value = tables.__iter__()
mocker.patch(
"azure.data.tables.TableServiceClient.from_connection_string",
return_value=mock_client
)

result = reader.get_tables()
result_table_names = [table.name for table in result]

expected_table_names = ["AzureTable1", "AzureTable2"]
assert result_table_names == expected_table_names


def test_get_tables_handles_exception(mocker, reader):
"""
Test that get_tables method handles exceptions correctly.
"""
mock_client = mocker.MagicMock()
mock_client.list_tables.side_effect = Exception("Failed to list tables")
mocker.patch(
"azure.data.tables.TableServiceClient.from_connection_string",
return_value=mock_client
)

with pytest.raises(Exception) as exc_info:
reader.get_tables()

assert "Failed to list tables" in str(exc_info.value)
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,12 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import logging
from unittest import mock

import pytest
from airbyte_cdk.models import AirbyteCatalog, SyncMode
from source_azure_table.source import SourceAzureTable
from source_azure_table.streams import AzureTableStream

source = SourceAzureTable()
logger = logging.getLogger()


# Fixtures
@pytest.fixture
def config():
return {"storage_account_name": "dummy-value", "storage_access_key": "dummy-value", "storage_endpoint_suffix": "dummy-value"}


@pytest.fixture
def tables():
table1 = mock.Mock()
table1.name = "AzureTable1"
table2 = mock.Mock()
table2.name = "AzureTable2"

tables = mock.MagicMock()
tables.__iter__.return_value = [table1, table2]
return tables


# Tests
def test_discover(mocker, config, tables):
def test_discover(mocker, config, tables, source, logger):
mocker.patch(
"source_azure_table.azure_table.AzureTableReader.get_tables",
return_value=tables,
Expand All @@ -47,14 +21,15 @@ def test_discover(mocker, config, tables):
assert stream.json_schema == {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
"properties": {"PartitionKey": {"type": "string"}},
}
assert stream.supported_sync_modes == [SyncMode.full_refresh, SyncMode.incremental]
assert stream.source_defined_cursor is True
assert stream.default_cursor_field == ["PartitionKey"]


def test_streams(mocker, config, tables):
def test_streams(mocker, config, tables, source, logger):
mocker.patch(
"source_azure_table.azure_table.AzureTableReader.get_tables",
return_value=tables,
Expand Down
3 changes: 1 addition & 2 deletions docs/integrations/sources/azure-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,11 @@ Visit the [Azure Portal](https://portal.azure.com). Go to your storage account,

We recommend creating a restricted key specifically for Airbyte access. This will allow you to control which resources Airbyte should be able to access. However, shared access key authentication is not supported by this connector yet.


## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------ |
| 0.1.4 | 2024-01-26 | [34576](https://github.com/airbytehq/airbyte/pull/34576) | Migrate to per-stream/global state |
| 0.1.3 | 2022-08-12 | [15591](https://github.com/airbytehq/airbyte/pull/15591) | Clean instantiation of AirbyteStream |
| 0.1.2 | 2021-12-23 | [14212](https://github.com/airbytehq/airbyte/pull/14212) | Adding incremental load capability |
| 0.1.1 | 2021-12-23 | [8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications |

0 comments on commit 78a6047

Please sign in to comment.