Skip to content

Commit

Permalink
Migrate sendgrid to config-based (#15257)
Browse files Browse the repository at this point in the history
* fix spec

* read records from lists stream

* campaigns

* contacts

* stats_automations

* segments

* single_sends

* templates

* suppressions_global

* suppression groups

* suppression group memebers

* blocks

* bounces

* invalid emails and spam reports

* bump cdk version

* fix paths

* bump cdk version

* only define cursor field in one place

* move to definitions

* move bounces inside the streams array

* move all streams within the streams array

* update sendgrid config

* fix

* derp

* rename field

* fix parse

* Revert "fix parse"

This reverts commit 3c76c5a.

* fix parse timestamp

* extract datetime parser

* remove print

* use parser

* top level docstring

* rename variable

* Revert "Merge branch 'alex/datetimeFormatTimestamp' into alex/configbasedsendgrid"

This reverts commit 99caa58, reversing
changes made to 028bdfb.

* Revert "Revert "Merge branch 'alex/datetimeFormatTimestamp' into alex/configbasedsendgrid""

This reverts commit 8d55afa.

* Revert "Revert "Revert "Merge branch 'alex/datetimeFormatTimestamp' into alex/configbasedsendgrid"""

This reverts commit 9b70a3b.

* do not use timestamp()

* Revert "do not use timestamp()"

This reverts commit 016cb69.

* Handle extracting no records from root

* bump cdk version

* handle empty record

* update unit test

* messages stream needs a different slicer

* handle missing keys

* Update unit test

* record extractor interface

* dpath extractor

* docstring

* use dpath

* Revert "Merge branch 'alex/selectNoRecords' into alex/configbasedsendgrid"

This reverts commit ac92374, reversing
changes made to e10d6b9.

* bump cdk version

* use dpath

* missing cursor field

* start DRYing the config

* delete more cruff

* DRY

* get start time from config

* delete custom streams

* step=30days

* bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
girarda and octavia-squidington-iii authored Aug 12, 2022
1 parent 330a196 commit 8497d78
Show file tree
Hide file tree
Showing 10 changed files with 306 additions and 362 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@
- name: Sendgrid
sourceDefinitionId: fbb5fbe2-16ad-4cf4-af7d-ff9d9c316c87
dockerRepository: airbyte/source-sendgrid
dockerImageTag: 0.2.8
dockerImageTag: 0.2.9
documentationUrl: https://docs.airbyte.io/integrations/sources/sendgrid
icon: sendgrid.svg
sourceType: api
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8644,7 +8644,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-sendgrid:0.2.8"
- dockerImage: "airbyte/source-sendgrid:0.2.9"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/sendgrid"
connectionSpecification:
Expand All @@ -8653,7 +8653,7 @@
type: "object"
required:
- "apikey"
additionalProperties: false
additionalProperties: true
properties:
apikey:
title: "Sendgrid API key"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-sendgrid/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.8
LABEL io.airbyte.version=0.2.9
LABEL io.airbyte.name=airbyte/source-sendgrid
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-sendgrid/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
author="Airbyte",
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=["airbyte-cdk~=0.1", "backoff", "requests", "pytest==6.1.2", "pytest-mock"],
install_requires=["airbyte-cdk>=0.1.74", "backoff", "requests", "pytest==6.1.2", "pytest-mock"],
package_data={"": ["*.json", "schemas/*.json"]},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
definitions:
page_size: 50
step: "30d"

schema_loader:
type: JsonSchema
file_path: "./source_sendgrid/schemas/{{ options.name }}.json"

requester:
type: HttpRequester
name: "{{ options['name'] }}"
url_base: "https://api.sendgrid.com"
http_method: "GET"
authenticator:
type: "BearerAuthenticator"
api_token: "{{ config.apikey }}"
cursor_paginator:
type: LimitPaginator
url_base: "*ref(definitions.requester.url_base)"
page_size: "*ref(definitions.page_size)"
limit_option:
inject_into: "request_parameter"
field_name: "page_size"
page_token_option:
inject_into: "path"
pagination_strategy:
type: "CursorPagination"
cursor_value: "{{ response._metadata.next }}"
offset_paginator:
type: LimitPaginator
$options:
url_base: "*ref(definitions.requester.url_base)"
page_size: "*ref(definitions.page_size)"
limit_option:
inject_into: "request_parameter"
field_name: "limit"
page_token_option:
inject_into: "request_parameter"
field_name: "offset"
pagination_strategy:
type: "OffsetIncrement"
retriever:
type: SimpleRetriever
name: "{{ options['name'] }}"
primary_key: "{{ options['primary_key'] }}"
stream_slicer:
type: "DatetimeStreamSlicer"
start_datetime:
datetime: "{{ config['start_time'] }}"
datetime_format: "%s"
end_datetime:
datetime: "{{ now_utc() }}"
datetime_format: "%Y-%m-%d %H:%M:%S.%f%z"
step: "*ref(definitions.step)"
cursor_field: "{{ options.stream_cursor_field }}"
start_time_option:
field_name: "start_time"
inject_into: "request_parameter"
end_time_option:
field_name: "end_time"
inject_into: "request_parameter"
datetime_format: "%s"
messages_stream_slicer:
type: "DatetimeStreamSlicer"
start_datetime:
datetime: "{{ config['start_time'] }}"
datetime_format: "%s"
end_datetime:
datetime: "{{ now_utc() }}}"
datetime_format: "%Y-%m-%d %H:%M:%S.%f%z"
step: "*ref(definitions.step)"
cursor_field: "{{ options.stream_cursor_field }}"
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"

base_stream:
type: DeclarativeStream
schema_loader:
$ref: "*ref(definitions.schema_loader)"
retriever:
$ref: "*ref(definitions.retriever)"
record_selector:
extractor:
field_pointer: []
requester:
$ref: "*ref(definitions.requester)"
paginator:
type: NoPagination
streams:
- $ref: "*ref(definitions.base_stream)"
$options:
name: "lists"
primary_key: "id"
path: "/v3/marketing/lists"
field_pointer: ["result"]
retriever:
$ref: "*ref(definitions.base_stream.retriever)"
paginator:
$ref: "*ref(definitions.cursor_paginator)"
- $ref: "*ref(definitions.base_stream)"
$options:
name: "campaigns"
primary_key: "id"
path: "/v3/marketing/campaigns"
field_pointer: ["result"]
retriever:
$ref: "*ref(definitions.base_stream.retriever)"
paginator:
$ref: "*ref(definitions.cursor_paginator)"
- $ref: "*ref(definitions.base_stream)"
$options:
name: "contacts"
primary_key: "id"
path: "/v3/marketing/contacts"
field_pointer: ["result"]
- $ref: "*ref(definitions.base_stream)"
$options:
name: "stats_automations"
primary_key: "id"
path: "/v3/marketing/stats/automations"
field_pointer: ["results"]
retriever:
$ref: "*ref(definitions.base_stream.retriever)"
paginator:
$ref: "*ref(definitions.cursor_paginator)"
- $ref: "*ref(definitions.base_stream)"
$options:
name: "segments"
primary_key: "id"
path: "/v3/marketing/segments"
field_pointer: ["results"]
- $ref: "*ref(definitions.base_stream)"
$options:
name: "single_sends"
primary_key: "id"
path: "/v3/marketing/stats/singlesends"
field_pointer: ["results"]
retriever:
$ref: "*ref(definitions.base_stream.retriever)"
paginator:
$ref: "*ref(definitions.cursor_paginator)"
- $ref: "*ref(definitions.base_stream)"
$options:
name: "templates"
primary_key: "id"
path: "/v3/templates"
field_pointer: ["result"]
retriever:
$ref: "*ref(definitions.base_stream.retriever)"
requester:
$ref: "*ref(definitions.base_stream.retriever.requester)"
request_options_provider:
request_parameters:
generations: "legacy,dynamic"
paginator:
$ref: "*ref(definitions.cursor_paginator)"
- $ref: "*ref(definitions.base_stream)"
$options:
name: "bounces"
primary_key: "email"
stream_cursor_field: "created"
path: "/v3/suppression/bounces"
field_pointer: []
retriever:
$ref: "*ref(definitions.base_stream.retriever)"
paginator:
$ref: "*ref(definitions.offset_paginator)"
stream_slicer:
$ref: "*ref(definitions.stream_slicer)"
- $ref: "*ref(definitions.base_stream)"
$options:
name: "global_suppressions"
primary_key: "email"
stream_cursor_field: "created"
path: "/v3/suppression/unsubscribes"
field_pointer: []
retriever:
$ref: "*ref(definitions.base_stream.retriever)"
paginator:
$ref: "*ref(definitions.offset_paginator)"
stream_slicer:
$ref: "*ref(definitions.stream_slicer)"
- $ref: "*ref(definitions.base_stream)"
$options:
name: "blocks"
primary_key: "email"
stream_cursor_field: "created"
path: "/v3/suppression/blocks"
field_pointer: []
retriever:
$ref: "*ref(definitions.base_stream.retriever)"
paginator:
$ref: "*ref(definitions.offset_paginator)"
stream_slicer:
$ref: "*ref(definitions.stream_slicer)"
- $ref: "*ref(definitions.base_stream)"
$options:
name: "suppression_groups"
primary_key: "id"
path: "/v3/asm/groups"
field_pointer: []
- $ref: "*ref(definitions.base_stream)"
$options:
name: "suppression_group_members"
primary_key: "group_id"
path: "/v3/asm/suppressions"
field_pointer: []
retriever:
$ref: "*ref(definitions.base_stream.retriever)"
paginator:
$ref: "*ref(definitions.offset_paginator)"
- $ref: "*ref(definitions.base_stream)"
$options:
name: "invalid_emails"
primary_key: "email"
stream_cursor_field: "created"
path: "/v3/suppression/invalid_emails"
field_pointer: []
retriever:
$ref: "*ref(definitions.base_stream.retriever)"
paginator:
$ref: "*ref(definitions.offset_paginator)"
stream_slicer:
$ref: "*ref(definitions.stream_slicer)"
- $ref: "*ref(definitions.base_stream)"
$options:
name: "spam_reports"
primary_key: "email"
stream_cursor_field: "created"
path: "/v3/suppression/spam_reports"
field_pointer: []
retriever:
$ref: "*ref(definitions.base_stream.retriever)"
paginator:
$ref: "*ref(definitions.offset_paginator)"
stream_slicer:
$ref: "*ref(definitions.stream_slicer)"
- $ref: "*ref(definitions.base_stream)"
$options:
name: "messages"
primary_key: "msg_id"
stream_cursor_field: "last_event_time"
path: "/v3/messages"
field_pointer: []
retriever:
$ref: "*ref(definitions.base_stream.retriever)"
requester:
$ref: "*ref(definitions.requester)"
request_options_provider:
request_parameters:
limit: 1000
query: 'last_event_time BETWEEN TIMESTAMP "{{stream_slice.start_time}}" AND TIMESTAMP "{{stream_slice.end_time}}"'
stream_slicer:
$ref: "*ref(definitions.messages_stream_slicer)"
check:
type: CheckStream
stream_names: ["lists"]
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,17 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource

from typing import Any, List, Mapping, Tuple
"""
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
source connector.
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
WARNING: Do not modify this file.
"""

from .streams import (
Blocks,
Bounces,
Campaigns,
Contacts,
GlobalSuppressions,
InvalidEmails,
Lists,
Messages,
Scopes,
Segments,
SingleSends,
SpamReports,
StatsAutomations,
SuppressionGroupMembers,
SuppressionGroups,
Templates,
)


class SourceSendgrid(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
try:
authenticator = TokenAuthenticator(config["apikey"])
scopes_gen = Scopes(authenticator=authenticator).read_records(sync_mode=SyncMode.full_refresh)
next(scopes_gen)
return True, None
except Exception as error:
return False, f"Unable to connect to Sendgrid API with the provided credentials - {error}"

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = TokenAuthenticator(config["apikey"])

streams = [
Lists(authenticator=authenticator),
Campaigns(authenticator=authenticator),
Contacts(authenticator=authenticator),
StatsAutomations(authenticator=authenticator),
Segments(authenticator=authenticator),
SingleSends(authenticator=authenticator),
Templates(authenticator=authenticator),
Messages(authenticator=authenticator, start_time=config["start_time"]),
GlobalSuppressions(authenticator=authenticator, start_time=config["start_time"]),
SuppressionGroups(authenticator=authenticator),
SuppressionGroupMembers(authenticator=authenticator),
Blocks(authenticator=authenticator, start_time=config["start_time"]),
Bounces(authenticator=authenticator, start_time=config["start_time"]),
InvalidEmails(authenticator=authenticator, start_time=config["start_time"]),
SpamReports(authenticator=authenticator, start_time=config["start_time"]),
]

return streams
# Declarative Source
class SourceSendgrid(YamlDeclarativeSource):
def __init__(self):
super().__init__(**{"path_to_yaml": "./source_sendgrid/sendgrid.yaml"})
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"title": "Sendgrid Spec",
"type": "object",
"required": ["apikey"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"apikey": {
"title": "Sendgrid API key",
Expand Down
Loading

0 comments on commit 8497d78

Please sign in to comment.