Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Source Sendgrid: Move contacts stream to async declarative component #45191

Merged
merged 2 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87
dockerImageTag: 1.0.18
dockerImageTag: 1.1.0
releases:
breakingChanges:
1.0.0:
Expand Down
496 changes: 369 additions & 127 deletions airbyte-integrations/connectors/source-sendgrid/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "1.0.18"
version = "1.1.0"
name = "source-sendgrid"
description = "Source implementation for Sendgrid."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -16,8 +16,8 @@ repository = "https://github.com/airbytehq/airbyte"
include = "source_sendgrid"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
airbyte-cdk = "^0"
python = "^3.10,<3.12"
airbyte_cdk = "^5"
pandas = "^2.1.1"

[tool.poetry.scripts]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import pendulum
from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.models import AirbyteMessageSerializer
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository
from orjson import orjson

logger = logging.getLogger("airbyte_logger")

Expand Down Expand Up @@ -68,7 +70,8 @@ def modify_and_save(cls, config_path: str, source: Source, config: Mapping[str,

@classmethod
def emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
print(create_connector_config_control_message(migrated_config).json(exclude_unset=True))
message = create_connector_config_control_message(migrated_config)
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())

@classmethod
def migrate(cls, args: List[str], source: Source) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: 0.81.1
version: 5.5.0
type: DeclarativeSource
check:
type: CheckStream
Expand Down Expand Up @@ -874,6 +874,62 @@ streams:
page_size: 100
cursor_value: '{{ response.get("_metadata", {}).get("next", {}) }}'
stop_condition: '{{ not response.get("_metadata", {}).get("next", {}) }}'

- type: DeclarativeStream
name: contacts
primary_key:
- contact_id
$parameters:
name: contacts
schema_loader:
type: JsonFileSchemaLoader
file_path: "./source_sendgrid/schemas/{{ parameters['name'] }}.json"
retriever:
type: AsyncRetriever
status_mapping:
running:
- pending
completed:
- ready
failed:
- failed
timeout:
- timeout
status_extractor:
type: DpathExtractor
field_path: ["status"]
urls_extractor:
type: DpathExtractor
field_path: ["urls"]
creation_requester:
type: HttpRequester
http_method: POST
url_base: https://api.sendgrid.com
path: /v3/marketing/contacts/exports
authenticator:
type: BearerAuthenticator
api_token: "{{ config['api_key'] }}"
polling_requester:
type: HttpRequester
http_method: GET
url_base: https://api.sendgrid.com
path: "/v3/marketing/contacts/exports/{{stream_slice['create_job_response'].json()['id'] }}"
authenticator:
type: BearerAuthenticator
api_token: "{{ config['api_key'] }}"
download_requester:
type: HttpRequester
http_method: GET
url_base: ""
path: "{{stream_slice['url']}}"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: []
transformations:
- type: KeysToLower

spec:
connection_specification:
$schema: http://json-schema.org/draft-07/schema#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from typing import Any, List, Mapping

from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator

from .streams import Contacts


# Hybrid Declarative Source
class SourceSendgrid(YamlDeclarativeSource):
def __init__(self):
# this takes care of check and other methods
def __init__(self) -> None:
super().__init__(**{"path_to_yaml": "manifest.yaml"})

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# get all the lowcode streams
streams = super().streams(config)
authenticator = TokenAuthenticator(config["api_key"])
# this stream download a csv file from sendgrid and emits the records
# it's not currently easy to do in lowcode, so we do it in python
streams.append(Contacts(authenticator=authenticator))
return streams

This file was deleted.

Loading
Loading