Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[source-us-census] fix empty fields after sync #45331

Merged
merged 7 commits into from
Sep 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: c4cfaeda-c757-489a-8aba-859fb08b6970
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
dockerRepository: airbyte/source-us-census
githubIssueLabel: source-us-census
icon: uscensus.svg
649 changes: 453 additions & 196 deletions airbyte-integrations/connectors/source-us-census/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "0.2.0"
version = "0.2.1"
name = "source-us-census"
description = "Source implementation for Us Census."
authors = [ "Airbyte <contact@airbyte.io>",]
Original file line number Diff line number Diff line change
@@ -3,20 +3,23 @@
#


from typing import List, Optional, Union
from dataclasses import dataclass
from typing import Any, List, Mapping, Optional, Union

import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.requesters.error_handlers import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_http_response_filter import DefaultHttpResponseFilter
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.types import Record
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
DEFAULT_ERROR_RESOLUTION,
SUCCESS_RESOLUTION,
ErrorResolution,
ResponseAction,
create_fallback_error_resolution,
)
from airbyte_cdk.sources.types import Config


class USCensusRecordExtractor(RecordExtractor):
@@ -132,4 +135,46 @@ def interpret_response(self, response_or_exception: Optional[Union[requests.Resp
default_reponse_filter = DefaultHttpResponseFilter(parameters={}, config=self.config)
default_response_filter_resolution = default_reponse_filter.matches(response_or_exception)

return default_response_filter_resolution if default_response_filter_resolution else DEFAULT_ERROR_RESOLUTION
return (
default_response_filter_resolution
if default_response_filter_resolution
else create_fallback_error_resolution(response_or_exception)
)


@dataclass
class USCensusSchema(SchemaLoader):
"""
The US Census website hosts many APIs: https://www.census.gov/data/developers/data-sets.html

These APIs return data in a non standard format.
We create the JSON schemas dynamically by reading the first "row" of data we get.

In this implementation all records are of type "string", but this function could
be changed to try and infer the data type based on the values it finds.
"""

config: Config

def get_json_schema(self) -> Mapping[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@topefolorunso can you add some unit tests to verify this works as expected?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests added now @girarda

query_params = self.config.get("query_params")
if query_params:
parts = query_params.split("&")
parameters = []
for part in parts:
key, value = part.split("=", 1)
if key == "get":
parameters += value.split(",")
elif key == "for":
parameters.append(value.split(":")[0])
else:
parameters.append(key)
json_schema = {k: {"type": "string"} for k in parameters}
else:
json_schema = {"{ @context: https://project-open-data.cio.gov/v1.1/schema/catalog.jsonld": {"type": "string"}}
return {
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": json_schema,
}
Original file line number Diff line number Diff line change
@@ -24,9 +24,8 @@ definitions:
type: CustomRecordExtractor
class_name: source_us_census.components.USCensusRecordExtractor
schema_loader:
type: InlineSchemaLoader
schema:
$ref: "#/schemas/us_census_stream"
type: CustomSchemaLoader
class_name: "source_us_census.components.USCensusSchema"
base_requester:
type: HttpRequester
url_base: https://api.census.gov/
@@ -84,14 +83,3 @@ spec:
order: 2
airbyte_secret: true
additionalProperties: true

metadata:
autoImportSchema:
us_census_stream: true

schemas:
us_census_stream:
type: object
$schema: http://json-schema.org/draft-07/schema#
additionalProperties: true
properties: {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from dataclasses import dataclass
from typing import Any, Mapping
from unittest.mock import Mock

import pytest
from source_us_census.components import USCensusSchema


@dataclass
class MockConfig:
query_params: str = None

def get(self, key):
if key == "query_params":
return self.query_params


@pytest.fixture
def census_schema():
def _create_schema(query_params=None):
config = MockConfig(query_params=query_params)
return USCensusSchema(config=config)
return _create_schema


def test_get_json_schema_basic_case(census_schema):
schema_instance = census_schema(query_params="get=NAME,POP&for=state:*")
schema = schema_instance.get_json_schema()

expected_properties = {
"NAME": {"type": "string"},
"POP": {"type": "string"},
"state": {"type": "string"}
}

assert schema["properties"] == expected_properties
assert schema["$schema"] == "http://json-schema.org/draft-07/schema#"
assert schema["type"] == "object"
assert schema["additionalProperties"] is True


def test_get_json_schema_with_get_param(census_schema):
schema_instance = census_schema(query_params="get=NAME,AGE,EMPLOYMENT")
schema = schema_instance.get_json_schema()

expected_properties = {
"NAME": {"type": "string"},
"AGE": {"type": "string"},
"EMPLOYMENT": {"type": "string"}
}

assert schema["properties"] == expected_properties


def test_get_json_schema_with_for_param(census_schema):
schema_instance = census_schema(query_params="for=county:1234")
schema = schema_instance.get_json_schema()

expected_properties = {
"county": {"type": "string"}
}

assert schema["properties"] == expected_properties


def test_get_json_schema_with_additional_params(census_schema):
schema_instance = census_schema(query_params="get=NAME&year=2020&for=us:*")
schema = schema_instance.get_json_schema()

expected_properties = {
"NAME": {"type": "string"},
"year": {"type": "string"},
"us": {"type": "string"}
}

assert schema["properties"] == expected_properties


def test_get_json_schema_no_query_params(census_schema):
schema_instance = census_schema(query_params=None)
schema = schema_instance.get_json_schema()

expected_properties = {
"{ @context: https://project-open-data.cio.gov/v1.1/schema/catalog.jsonld": {"type": "string"}
}

assert schema["properties"] == expected_properties
1 change: 1 addition & 0 deletions docs/integrations/sources/us-census.md
Original file line number Diff line number Diff line change
@@ -45,6 +45,7 @@ In addition, to understand how to configure the dataset path and query parameter

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :------------------------------------------------ |
| 0.2.1 | 2024-09-07 | [45331](https://github.com/airbytehq/airbyte/pull/45331) | Fix schema |
| 0.2.0 | 2024-08-10 | [43521](https://github.com/airbytehq/airbyte/pull/43521) | Migrate to Low Code |
| 0.1.16 | 2024-08-10 | [43566](https://github.com/airbytehq/airbyte/pull/43566) | Update dependencies |
| 0.1.15 | 2024-08-03 | [43214](https://github.com/airbytehq/airbyte/pull/43214) | Update dependencies |