Skip to content

Commit

Permalink
Merge branch 'master' into flash1293/refactor-all-the-connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Reuter authored Jan 29, 2024
2 parents fb36c80 + dccb2fa commit 4286e42
Show file tree
Hide file tree
Showing 45 changed files with 1,324 additions and 29 deletions.
1 change: 1 addition & 0 deletions .github/workflows/airbyte-ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ jobs:
uses: ./.github/actions/run-airbyte-ci
with:
context: "pull_request"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
Expand Down
1 change: 1 addition & 0 deletions airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,7 @@ E.G.: running `pytest` on a specific test folder:

| Version | PR | Description |
| ------- | ---------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- |
| 3.8.1 | [#34607](https://github.com/airbytehq/airbyte/pull/34607) | Improve gradle dependency cache volume protection. |
| 3.8.0 | [#34316](https://github.com/airbytehq/airbyte/pull/34316) | Expose Dagger engine image name in `--ci-requirements` and add `--ci-requirements` to the `airbyte-ci` root command group. |
| 3.7.3 | [#34560](https://github.com/airbytehq/airbyte/pull/34560) | Simplify Gradle task execution framework by removing local maven repo support. |
| 3.7.2 | [#34555](https://github.com/airbytehq/airbyte/pull/34555) | Override secret masking in some very specific special cases. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ async def _run(self, *args: Any, **kwargs: Any) -> StepResult:
.with_exec(
sh_dash_c(
[
# Defensively delete the gradle home directory to avoid dirtying the cache volume.
f"rm -rf {self.GRADLE_HOME_PATH}",
# Load from the cache volume.
f"(rsync -a --stats --mkpath {self.GRADLE_DEP_CACHE_PATH}/ {self.GRADLE_HOME_PATH} || true)",
# Resolve all dependencies and write their checksums to './gradle/verification-metadata.dryrun.xml'.
Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipelines"
version = "3.8.0"
version = "3.8.1"
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
authors = ["Airbyte <contact@airbyte.io>"]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 3.3.2
Fix TestBasicRead.test_read.validate_schema: set `additionalProperties` to False recursively for objects

## 3.3.1
Fix TestSpec.test_oauth_is_default_method to skip connectors that doesn't have predicate_key object.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from collections import defaultdict
from typing import Any, Dict, List, Mapping

import dpath.util
import pendulum
from airbyte_protocol.models import AirbyteRecordMessage, ConfiguredAirbyteCatalog
from jsonschema import Draft7Validator, FormatChecker, FormatError, ValidationError, validators
Expand All @@ -26,6 +25,40 @@
Draft7ValidatorWithStrictInteger = validators.extend(Draft7Validator, type_checker=strict_integer_type_checker)


class NoAdditionalPropertiesValidator(Draft7Validator):
def __init__(self, schema, **kwargs):
schema = self._enforce_false_additional_properties(schema)
super().__init__(schema, **kwargs)

@staticmethod
def _enforce_false_additional_properties(json_schema: Dict[str, Any]) -> Dict[str, Any]:
"""Create a copy of the schema in which `additionalProperties` is set to False for all non-null object properties.
This method will override the value of `additionalProperties` if it is set,
or will create the property and set it to False if it does not exist.
"""
new_schema = copy.deepcopy(json_schema)
new_schema["additionalProperties"] = False

def add_properties(properties):
for prop_name, prop_value in properties.items():
if "type" in prop_value and "object" in prop_value["type"] and len(prop_value.get("properties", [])):
prop_value["additionalProperties"] = False
add_properties(prop_value.get("properties", {}))
elif "type" in prop_value and "array" in prop_value["type"]:
if (
prop_value.get("items")
and "object" in prop_value.get("items", {}).get("type")
and len(prop_value.get("items", {}).get("properties", []))
):
prop_value["items"]["additionalProperties"] = False
if prop_value.get("items", {}).get("properties"):
add_properties(prop_value["items"]["properties"])

add_properties(new_schema.get("properties", {}))
return new_schema


class CustomFormatChecker(FormatChecker):
@staticmethod
def check_datetime(value: str) -> bool:
Expand All @@ -46,17 +79,6 @@ def check(self, instance, format):
return super().check(instance, format)


def _enforce_no_additional_top_level_properties(json_schema: Dict[str, Any]):
"""Create a copy of the schema in which `additionalProperties` is set to False for the dict of top-level properties.
This method will override the value of `additionalProperties` if it is set,
or will create the property and set it to False if it does not exist.
"""
enforced_schema = copy.deepcopy(json_schema)
dpath.util.new(enforced_schema, "additionalProperties", False)
return enforced_schema


def verify_records_schema(
records: List[AirbyteRecordMessage], catalog: ConfiguredAirbyteCatalog, fail_on_extra_columns: bool
) -> Mapping[str, Mapping[str, ValidationError]]:
Expand All @@ -66,11 +88,8 @@ def verify_records_schema(
stream_validators = {}
for stream in catalog.streams:
schema_to_validate_against = stream.stream.json_schema
if fail_on_extra_columns:
schema_to_validate_against = _enforce_no_additional_top_level_properties(schema_to_validate_against)
stream_validators[stream.stream.name] = Draft7ValidatorWithStrictInteger(
schema_to_validate_against, format_checker=CustomFormatChecker()
)
validator = NoAdditionalPropertiesValidator if fail_on_extra_columns else Draft7ValidatorWithStrictInteger
stream_validators[stream.stream.name] = validator(schema_to_validate_against, format_checker=CustomFormatChecker())
stream_errors = defaultdict(dict)
for record in records:
validator = stream_validators.get(record.stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,62 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog):
]


@pytest.mark.parametrize(
"json_schema, record, should_fail",
[
(
{"type": "object", "properties": {"a": {"type": "string"}}},
{"a": "str", "b": "extra_string"},
True
),
(
{"type": "object", "properties": {"a": {"type": "string"}, "some_obj": {"type": ["null", "object"]}}},
{"a": "str", "some_obj": {"b": "extra_string"}},
False
),
(
{
"type": "object",
"properties": {"a": {"type": "string"}, "some_obj": {"type": ["null", "object"], "properties": {"a": {"type": "string"}}}},
},
{"a": "str", "some_obj": {"a": "str", "b": "extra_string"}},
True
),
(
{"type": "object", "properties": {"a": {"type": "string"}, "b": {"type": "array", "items": {"type": "object"}}}},
{"a": "str", "b": [{"a": "extra_string"}]},
False
),
(
{
"type": "object",
"properties": {
"a": {"type": "string"},
"b": {"type": "array", "items": {"type": "object", "properties": {"a": {"type": "string"}}}},
}
},
{"a": "str", "b": [{"a": "string", "b": "extra_string"}]},
True
),
],
ids=[
"simple_schema_and_record_with_extra_property",
"schema_with_object_without_properties_and_record_with_object_with_property",
"schema_with_object_with_properties_and_record_with_object_with_extra_property",
"schema_with_array_of_objects_without_properties_and_record_with_array_of_objects_with_property",
"schema_with_array_of_objects_with_properties_and_record_with_array_of_objects_with_extra_property",
],
)
def test_verify_records_schema_with_fail_on_extra_columns(configured_catalog: ConfiguredAirbyteCatalog, json_schema, record, should_fail):
"""Test that fail_on_extra_columns works correctly with nested objects, array of objects"""
configured_catalog.streams[0].stream.json_schema =json_schema
records = [AirbyteRecordMessage(stream="my_stream", data=record, emitted_at=0)]
streams_with_errors = verify_records_schema(records, configured_catalog, fail_on_extra_columns=True)
errors = [error.message for error in streams_with_errors["my_stream"].values()]
assert errors if should_fail else not errors


@pytest.mark.parametrize(
"record, configured_catalog, valid",
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY destination_vectara ./destination_vectara
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.name=airbyte/destination-vectara
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def delete_docs_by_id(self, document_ids):
)

def index_document(self, document):
document_section, document_metadata, document_id = document
document_section, document_metadata, document_title, document_id = document
if len(document_section) == 0:
return None # Document is empty, so skip it
document_metadata = self._normalize(document_metadata)
Expand All @@ -157,6 +157,7 @@ def index_document(self, document):
"document": {
"documentId": document_id,
"metadataJson": json.dumps(document_metadata),
"title": document_title,
"section": [
{"text": f"{section_key}: {section_value}"}
for section_key, section_value in document_section.items()
Expand All @@ -169,7 +170,7 @@ def index_document(self, document):

def index_documents(self, documents):
if self.parallelize:
with ThreadPoolExecutor() as executor: ### DEBUG remove max_workers limit
with ThreadPoolExecutor() as executor:
futures = [executor.submit(self.index_document, doc) for doc in documents]
for future in futures:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ class VectaraConfig(BaseModel):
always_show=True,
examples=["text", "user.name", "users.*.name"],
)
title_field: Optional[str] = Field(
default="",
title="Text field to use as document title with Vectara",
description="A field that will be used to populate the `title` of each document. The field list is applied to all streams in the same way and non-existing fields are ignored. If none are defined, all fields are considered text fields. When specifying text fields, you can access nested fields in the record by using dot notation, e.g. `user.name` will access the `name` field in the `user` object. It's also possible to use wildcards to access all fields in an object, e.g. `users.*.name` will access all `names` fields in all entries of the `users` array.",
always_show=True,
examples=["document_key"],
)
metadata_fields: Optional[List[str]] = Field(
default=[],
title="Fields to store as metadata",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def write(
writer = VectaraWriter(
client=VectaraClient(config_model),
text_fields=config_model.text_fields,
title_field=config_model.title_field,
metadata_fields=config_model.metadata_fields,
catalog=configured_catalog,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ def __init__(
self,
client: VectaraClient,
text_fields: Optional[List[str]],
title_field: Optional[str],
metadata_fields: Optional[List[str]],
catalog: ConfiguredAirbyteCatalog,
):
self.client = client
self.text_fields = text_fields
self.title_field = title_field
self.metadata_fields = metadata_fields
self.streams = {f"{stream.stream.namespace}_{stream.stream.name}": stream for stream in catalog.streams}
self.ids_to_delete: List[str] = []
Expand All @@ -51,6 +53,7 @@ def queue_write_operation(self, record: AirbyteRecordMessage) -> None:
stream_identifier = self._get_stream_id(record=record)
document_section = self._get_document_section(record=record)
document_metadata = self._get_document_metadata(record=record)
document_title = self._get_document_title(record=record)
primary_key = self._get_record_primary_key(record=record)

if primary_key:
Expand All @@ -60,7 +63,7 @@ def queue_write_operation(self, record: AirbyteRecordMessage) -> None:
else:
document_id = str(uuid.uuid4().int)

self.write_buffer.append((document_section, document_metadata, document_id))
self.write_buffer.append((document_section, document_metadata, document_title, document_id))
if len(self.write_buffer) == self.flush_interval:
self.flush()

Expand Down Expand Up @@ -99,6 +102,12 @@ def _get_document_metadata(self, record: AirbyteRecordMessage) -> Dict[str, Any]
document_metadata[METADATA_STREAM_FIELD] = self._get_stream_id(record)
return document_metadata

def _get_document_title(self, record: AirbyteRecordMessage) -> str:
title = "Untitled"
if self.title_field:
title = dpath.util.get(record.data, self.title_field)
return title

def _get_stream_id(self, record: AirbyteRecordMessage) -> str:
return f"{record.namespace}_{record.stream}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def test_check_invalid_config(self):
"customer_id": "123456",
"text_fields": [],
"metadata_fields": [],
"title_field": "",
},
)
assert outcome.status == Status.FAILED
Expand Down Expand Up @@ -120,6 +121,7 @@ def test_write(self):
"metadata": [
{"name": "int_col", "value": "2"},
{"name": "_ab_stream", "value": "None_mystream"},
{"name": "title", "value": "Cats are nice"},
],
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 102900e7-a236-4c94-83e4-a4189b99adc2
dockerImageTag: 0.1.0
dockerImageTag: 0.2.0
dockerRepository: airbyte/destination-vectara
githubIssueLabel: destination-vectara
icon: vectara.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: ef69ef6e-aa7f-4af1-a01d-ef775033524e
dockerImageTag: 1.5.6
dockerImageTag: 1.5.7
dockerRepository: airbyte/source-github
documentationUrl: https://docs.airbyte.com/integrations/sources/github
githubIssueLabel: source-github
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ def read_records(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iter

self.logger.warning(error_msg)
except GitHubAPILimitException:
self.logger.warning("Limits for all provided tokens are reached, please try again later")
self.logger.warning(
f"Stream: `{self.name}`, slice: `{stream_slice}`. Limits for all provided tokens are reached, please try again later"
)


class GithubStream(GithubStreamABC):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def process_token(self, current_token, count_attr, reset_attr):
elif all(getattr(x, count_attr) == 0 for x in self._tokens.values()):
min_time_to_wait = min((getattr(x, reset_attr) - pendulum.now()).in_seconds() for x in self._tokens.values())
if min_time_to_wait < self.max_time:
time.sleep(min_time_to_wait)
time.sleep(min_time_to_wait if min_time_to_wait > 0 else 0)
self.check_all_tokens()
else:
raise GitHubAPILimitException(f"Rate limits for all tokens ({count_attr}) were reached")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def request_callback_orgs(request):

list(read_full_refresh(stream))
assert [(x.count_rest, x.count_graphql) for x in authenticator._tokens.values()] == [(0, 500), (0, 500), (0, 500)]
assert "Limits for all provided tokens are reached, please try again later" in caplog.messages
assert "Stream: `organizations`, slice: `{'organization': 'org1'}`. Limits for all provided tokens are reached, please try again later" in caplog.messages


@freeze_time("2021-01-01 12:00:00")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*
!Dockerfile
!Dockerfile.test
!main.py
!source_google_analytics_v4_service_account_only
!setup.py
!secrets
Loading

0 comments on commit 4286e42

Please sign in to comment.