diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 016600a1de701..dcd1656a2359e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -213,7 +213,7 @@ type: "string" path_in_connector_config: - "client_secret" -- dockerImage: "airbyte/source-amazon-seller-partner:0.2.18" +- dockerImage: "airbyte/source-amazon-seller-partner:0.2.21" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/amazon-seller-partner" changelogUrl: "https://docs.airbyte.io/integrations/sources/amazon-seller-partner" @@ -221,6 +221,56 @@ title: "Amazon Seller Partner Spec" type: "object" properties: + app_id: + title: "App Id *" + description: "Your Amazon App ID" + airbyte_secret: true + order: 0 + type: "string" + auth_type: + title: "Auth Type" + const: "oauth2.0" + order: 1 + type: "string" + lwa_app_id: + title: "LWA Client Id" + description: "Your Login with Amazon Client ID." + order: 2 + type: "string" + lwa_client_secret: + title: "LWA Client Secret" + description: "Your Login with Amazon Client Secret." + airbyte_secret: true + order: 3 + type: "string" + refresh_token: + title: "Refresh Token" + description: "The Refresh Token obtained via OAuth flow authorization." + airbyte_secret: true + order: 4 + type: "string" + aws_access_key: + title: "AWS Access Key" + description: "Specifies the AWS access key used as part of the credentials\ + \ to authenticate the user." + airbyte_secret: true + order: 5 + type: "string" + aws_secret_key: + title: "AWS Secret Access Key" + description: "Specifies the AWS secret key used as part of the credentials\ + \ to authenticate the user." + airbyte_secret: true + order: 6 + type: "string" + role_arn: + title: "Role ARN" + description: "Specifies the Amazon Resource Name (ARN) of an IAM role that\ + \ you want to use to perform operations requested using this profile.\ + \ (Needs permission to 'Assume Role' STS)." + airbyte_secret: true + order: 7 + type: "string" replication_start_date: title: "Start Date" description: "UTC date and time in the format 2017-01-25T00:00:00Z. Any\ @@ -229,6 +279,14 @@ examples: - "2017-01-25T00:00:00Z" type: "string" + replication_end_date: + title: "End Date" + description: "UTC date and time in the format 2017-01-25T00:00:00Z. Any\ + \ data after this date will not be replicated." + pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$|^$" + examples: + - "2017-01-25T00:00:00Z" + type: "string" period_in_days: title: "Period In Days" description: "Will be used for stream slicing for initial full_refresh sync\ @@ -257,40 +315,6 @@ - "500" - "1980" type: "integer" - refresh_token: - title: "Refresh Token" - description: "The Refresh Token obtained via OAuth flow authorization." - airbyte_secret: true - type: "string" - lwa_app_id: - title: "LwA App Id" - description: "Your Login with Amazon App ID" - airbyte_secret: true - type: "string" - lwa_client_secret: - title: "LwA Client Secret" - description: "Your Login with Amazon Client Secret." - airbyte_secret: true - type: "string" - aws_access_key: - title: "AWS Access Key" - description: "Specifies the AWS access key used as part of the credentials\ - \ to authenticate the user." - airbyte_secret: true - type: "string" - aws_secret_key: - title: "AWS Secret Access Key" - description: "Specifies the AWS secret key used as part of the credentials\ - \ to authenticate the user." - airbyte_secret: true - type: "string" - role_arn: - title: "Role ARN" - description: "Specifies the Amazon Resource Name (ARN) of an IAM role that\ - \ you want to use to perform operations requested using this profile.\ - \ (Needs permission to 'Assume Role' STS)." - airbyte_secret: true - type: "string" aws_environment: title: "AWSEnvironment" description: "An enumeration." @@ -303,37 +327,38 @@ description: "An enumeration." enum: - "AE" + - "AU" + - "BR" + - "CA" - "DE" - - "PL" - "EG" - "ES" - "FR" + - "GB" - "IN" - "IT" + - "JP" + - "MX" - "NL" + - "PL" - "SA" - "SE" + - "SG" - "TR" - "UK" - - "AU" - - "JP" - - "SG" - "US" - - "BR" - - "CA" - - "MX" - - "GB" type: "string" required: - - "replication_start_date" - - "refresh_token" - "lwa_app_id" - "lwa_client_secret" + - "refresh_token" - "aws_access_key" - "aws_secret_key" - "role_arn" + - "replication_start_date" - "aws_environment" - "region" + additionalProperties: true definitions: AWSEnvironment: title: "AWSEnvironment" @@ -347,30 +372,72 @@ description: "An enumeration." enum: - "AE" + - "AU" + - "BR" + - "CA" - "DE" - - "PL" - "EG" - "ES" - "FR" + - "GB" - "IN" - "IT" + - "JP" + - "MX" - "NL" + - "PL" - "SA" - "SE" + - "SG" - "TR" - "UK" - - "AU" - - "JP" - - "SG" - "US" - - "BR" - - "CA" - - "MX" - - "GB" type: "string" supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] + advanced_auth: + auth_flow_type: "oauth2.0" + predicate_key: + - "auth_type" + predicate_value: "oauth2.0" + oauth_config_specification: + oauth_user_input_from_connector_config_specification: + type: "object" + additionalProperties: false + properties: + app_id: + type: "string" + path_in_connector_config: + - "app_id" + complete_oauth_output_specification: + type: "object" + additionalProperties: false + properties: + refresh_token: + type: "string" + path_in_connector_config: + - "refresh_token" + complete_oauth_server_input_specification: + type: "object" + additionalProperties: false + properties: + lwa_app_id: + type: "string" + lwa_client_secret: + type: "string" + complete_oauth_server_output_specification: + type: "object" + additionalProperties: false + properties: + lwa_app_id: + type: "string" + path_in_connector_config: + - "lwa_app_id" + lwa_client_secret: + type: "string" + path_in_connector_config: + - "lwa_client_secret" - dockerImage: "airbyte/source-amazon-sqs:0.1.0" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/amazon-sqs" @@ -480,7 +547,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-amplitude:0.1.7" +- dockerImage: "airbyte/source-amplitude:0.1.8" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/amplitude" connectionSpecification: @@ -1771,7 +1838,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-facebook-marketing:0.2.49" +- dockerImage: "airbyte/source-facebook-marketing:0.2.50" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" @@ -2074,6 +2141,14 @@ - "2017-01-26T00:00:00Z" type: "string" format: "date-time" + insights_lookback_window: + title: "Custom Insights Lookback Window" + description: "The attribution window" + default: 28 + maximum: 28 + mininum: 1 + exclusiveMinimum: 0 + type: "integer" required: - "name" page_size: @@ -2086,6 +2161,15 @@ order: 7 exclusiveMinimum: 0 type: "integer" + insights_lookback_window: + title: "Insights Lookback Window" + description: "The attribution window" + default: 28 + order: 8 + maximum: 28 + mininum: 1 + exclusiveMinimum: 0 + type: "integer" required: - "account_id" - "start_date" @@ -2133,7 +2217,7 @@ oauthFlowInitParameters: [] oauthFlowOutputParameters: - - "access_token" -- dockerImage: "airbyte/source-faker:0.1.1" +- dockerImage: "airbyte/source-faker:0.1.4" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/faker" connectionSpecification: @@ -2146,10 +2230,11 @@ properties: count: title: "Count" - description: "How many fake records should be generated" + description: "How many users should be generated in total. This setting\ + \ does not apply to the purchases or products stream." type: "integer" minimum: 1 - default: 100 + default: 1000 order: 0 seed: title: "Seed" @@ -2158,6 +2243,23 @@ type: "integer" default: -1 order: 1 + records_per_sync: + title: "Records Per Sync" + description: "How many fake records will be returned for each sync, for\ + \ each stream? By default, it will take 2 syncs to create the requested\ + \ 1000 records." + type: "integer" + minimum: 1 + default: 500 + order: 2 + records_per_slice: + title: "Records Per Stream Slice" + description: "How many fake records will be in each page (stream slice),\ + \ before a state message is emitted?" + type: "integer" + minimum: 1 + default: 100 + order: 3 supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] @@ -2387,7 +2489,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-freshdesk:0.2.11" +- dockerImage: "airbyte/source-freshdesk:0.3.1" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/freshdesk" connectionSpecification: @@ -2493,7 +2595,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-github:0.2.31" +- dockerImage: "airbyte/source-github:0.2.32" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/github" connectionSpecification: @@ -2678,7 +2780,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-google-ads:0.1.39" +- dockerImage: "airbyte/source-google-ads:0.1.40" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/google-ads" connectionSpecification: @@ -3188,7 +3290,7 @@ oauthFlowOutputParameters: - - "access_token" - - "refresh_token" -- dockerImage: "airbyte/source-google-sheets:0.2.14" +- dockerImage: "airbyte/source-google-sheets:0.2.15" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/google-sheets" connectionSpecification: @@ -3531,7 +3633,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-hubspot:0.1.59" +- dockerImage: "airbyte/source-hubspot:0.1.67" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/hubspot" connectionSpecification: @@ -3872,7 +3974,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-jira:0.2.19" +- dockerImage: "airbyte/source-jira:0.2.20" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/jira" connectionSpecification: @@ -3951,7 +4053,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-kafka:0.1.5" +- dockerImage: "airbyte/source-kafka:0.1.6" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/kafka" connectionSpecification: @@ -4197,7 +4299,7 @@ supported_destination_sync_modes: [] supported_source_sync_modes: - "append" -- dockerImage: "airbyte/source-klaviyo:0.1.3" +- dockerImage: "airbyte/source-klaviyo:0.1.4" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/klaviyo" changelogUrl: "https://docs.airbyte.io/integrations/sources/klaviyo" @@ -4373,7 +4475,7 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-linkedin-ads:0.1.7" +- dockerImage: "airbyte/source-linkedin-ads:0.1.8" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/linkedin-ads" connectionSpecification: @@ -4718,7 +4820,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mssql:0.3.22" +- dockerImage: "airbyte/source-mssql:0.4.2" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql" connectionSpecification: @@ -4730,7 +4832,7 @@ - "port" - "database" - "username" - additionalProperties: false + additionalProperties: true properties: host: description: "The hostname of the database." @@ -4823,8 +4925,8 @@ description: "Specifies the host name of the server. The value of\ \ this property must match the subject property of the certificate." order: 7 - replication_method: - type: "string" + replication: + type: "object" title: "Replication Method" description: "The replication method used for extracting data from the database.\ \ STANDARD replication requires no setup on the DB side but will not be\ @@ -4832,10 +4934,62 @@ \ inserts, updates, and deletes. This needs to be configured on the source\ \ database itself." default: "STANDARD" - enum: - - "STANDARD" - - "CDC" + additionalProperties: true order: 8 + oneOf: + - title: "Standard" + additionalProperties: false + description: "Standard replication requires no setup on the DB side but\ + \ will not be able to represent deletions incrementally." + required: + - "replication_type" + properties: + replication_type: + type: "string" + const: "STANDARD" + enum: + - "STANDARD" + default: "STANDARD" + order: 0 + - title: "Logical Replication (CDC)" + additionalProperties: false + description: "CDC uses {TBC} to detect inserts, updates, and deletes.\ + \ This needs to be configured on the source database itself." + required: + - "replication_type" + properties: + replication_type: + type: "string" + const: "CDC" + enum: + - "CDC" + default: "CDC" + order: 0 + data_to_sync: + title: "Data to Sync" + type: "string" + default: "Existing and New" + enum: + - "Existing and New" + - "New Changes Only" + description: "What data should be synced under the CDC. \"Existing\ + \ and New\" will read existing data as a snapshot, and sync new\ + \ changes through CDC. \"New Changes Only\" will skip the initial\ + \ snapshot, and only sync new changes through CDC." + order: 1 + snapshot_isolation: + title: "Initial Snapshot Isolation Level" + type: "string" + default: "Snapshot" + enum: + - "Snapshot" + - "Read Committed" + description: "Existing data in the database are synced through an\ + \ initial snapshot. This parameter controls the isolation level\ + \ that will be used during the initial snapshotting. If you choose\ + \ the \"Snapshot\" level, you must enable the snapshot isolation mode on the database." + order: 2 tunnel_method: type: "object" title: "SSH Tunnel Method" @@ -5087,7 +5241,7 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-mixpanel:0.1.15" +- dockerImage: "airbyte/source-mixpanel:0.1.17" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/mixpanel" connectionSpecification: @@ -5099,12 +5253,14 @@ additionalProperties: true properties: api_secret: + order: 0 title: "Project Token" type: "string" description: "Mixpanel project token. See the docs for more information on how to obtain this." airbyte_secret: true attribution_window: + order: 1 title: "Attribution Window" type: "integer" description: " A period of time for attributing results to ads and the lookback\ @@ -5112,6 +5268,7 @@ \ Default attribution window is 5 days." default: 5 project_timezone: + order: 2 title: "Project Timezone" type: "string" description: "Time zone in which integer date times are stored. The project\ @@ -5122,6 +5279,7 @@ - "US/Pacific" - "UTC" select_properties_by_default: + order: 3 title: "Select Properties By Default" type: "boolean" description: "Setting this config parameter to TRUE ensures that new properties\ @@ -5129,6 +5287,7 @@ \ will be ignored." default: true start_date: + order: 4 title: "Start Date" type: "string" description: "UTC date and time in the format 2017-01-25T00:00:00Z. Any\ @@ -5137,7 +5296,18 @@ examples: - "2021-11-16" pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?$" + end_date: + order: 5 + title: "End Date" + type: "string" + description: "UTC date and time in the format 2017-01-25T00:00:00Z. Any\ + \ data after this date will not be replicated. Left empty to always sync\ + \ to most recent date" + examples: + - "2021-11-16" + pattern: "^[0-9]{4}-[0-9]{2}-[0-9]{2}(T[0-9]{2}:[0-9]{2}:[0-9]{2}Z)?$" region: + order: 6 title: "Region" description: "The region of mixpanel domain instance either US or EU." type: "string" @@ -5145,6 +5315,14 @@ - "US" - "EU" default: "US" + date_window_size: + order: 7 + title: "Date slicing window" + description: "Defines window size in days, that used to slice through data.\ + \ You can reduce it, if amount of data in each window is too big for your\ + \ environment." + type: "integer" + default: 30 supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] @@ -6537,7 +6715,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:0.4.17" +- dockerImage: "airbyte/source-postgres:0.4.21" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: @@ -7148,7 +7326,27 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-s3:0.1.14" +- dockerImage: "airbyte/source-rki-covid:0.1.1" + spec: + documentationUrl: "https://docs.airbyte.com/integrations/sources/rki-covid" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "RKI Covid Spec" + type: "object" + required: + - "start_date" + additionalProperties: false + properties: + start_date: + type: "string" + title: "Start Date" + description: "UTC date in the format 2017-01-25. Any data before this date\ + \ will not be replicated." + order: 1 + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: [] +- dockerImage: "airbyte/source-s3:0.1.15" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/s3" changelogUrl: "https://docs.airbyte.io/integrations/sources/s3" @@ -8494,7 +8692,7 @@ type: "string" path_in_connector_config: - "client_secret" -- dockerImage: "airbyte/source-stripe:0.1.32" +- dockerImage: "airbyte/source-stripe:0.1.33" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/stripe" connectionSpecification: @@ -9476,7 +9674,7 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-zendesk-support:0.2.8" +- dockerImage: "airbyte/source-zendesk-support:0.2.9" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/zendesk-support" connectionSpecification: @@ -9916,3 +10114,107 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] +- dockerImage: "airbyte/source-sftp:0.1.1" + spec: + documentationUrl: "https://docs.airbyte.io/integrations/source/sftp" + connectionSpecification: + $schema: "http://json-schema.org/draft-07/schema#" + title: "SFTP Source Spec" + type: "object" + required: + - "user" + - "host" + - "port" + additionalProperties: true + properties: + user: + title: "User Name" + description: "The server user" + type: "string" + order: 0 + host: + title: "Host Address" + description: "The server host address" + type: "string" + examples: + - "www.host.com" + - "192.0.2.1" + order: 1 + port: + title: "Port" + description: "The server port" + type: "integer" + default: 22 + examples: + - "22" + order: 2 + credentials: + type: "object" + title: "Authentication *" + description: "The server authentication method" + order: 3 + oneOf: + - title: "Password Authentication" + required: + - "auth_method" + - "auth_user_password" + properties: + auth_method: + description: "Connect through password authentication" + type: "string" + const: "SSH_PASSWORD_AUTH" + order: 0 + auth_user_password: + title: "Password" + description: "OS-level password for logging into the jump server host" + type: "string" + airbyte_secret: true + order: 1 + - title: "SSH Key Authentication" + required: + - "auth_method" + - "auth_ssh_key" + properties: + auth_method: + description: "Connect through ssh key" + type: "string" + const: "SSH_KEY_AUTH" + order: 0 + auth_ssh_key: + title: "SSH Private Key" + description: "OS-level user account ssh key credentials in RSA PEM\ + \ format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )" + type: "string" + airbyte_secret: true + multiline: true + order: 1 + file_types: + title: "File types" + description: "Coma separated file types. Currently only 'csv' and 'json'\ + \ types are supported." + type: "string" + default: "csv,json" + order: 4 + examples: + - "csv,json" + - "csv" + folder_path: + title: "Folder Path (Optional)" + description: "The directory to search files for sync" + type: "string" + default: "" + examples: + - "/logs/2022" + order: 5 + file_pattern: + title: "File Pattern (Optional)" + description: "The regular expression to specify files for sync in a chosen\ + \ Folder Path" + type: "string" + default: "" + examples: + - "log-([0-9]{4})([0-9]{2})([0-9]{2}) - This will filter files which `log-yearmmdd`" + order: 6 + supportsNormalization: false + supportsDBT: false + supported_destination_sync_modes: [] diff --git a/airbyte-integrations/connectors/source-twilio/Dockerfile b/airbyte-integrations/connectors/source-twilio/Dockerfile index 7b7dc90951e08..f3e9ec7aea9d4 100644 --- a/airbyte-integrations/connectors/source-twilio/Dockerfile +++ b/airbyte-integrations/connectors/source-twilio/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/source-twilio diff --git a/airbyte-integrations/connectors/source-twilio/source_twilio/source.py b/airbyte-integrations/connectors/source-twilio/source_twilio/source.py index 7ec8a481e7f1b..9c6d23a6d960e 100644 --- a/airbyte-integrations/connectors/source-twilio/source_twilio/source.py +++ b/airbyte-integrations/connectors/source-twilio/source_twilio/source.py @@ -61,7 +61,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: ), ) full_refresh_stream_kwargs = {"authenticator": auth} - incremental_stream_kwargs = {"authenticator": auth, "start_date": config["start_date"]} + incremental_stream_kwargs = { + "authenticator": auth, + "start_date": config["start_date"], + "lookback_window": config["lookback_window"], + } streams = [ Accounts(**full_refresh_stream_kwargs), diff --git a/airbyte-integrations/connectors/source-twilio/source_twilio/spec.json b/airbyte-integrations/connectors/source-twilio/source_twilio/spec.json index f5809c27fc36a..182977df2d161 100644 --- a/airbyte-integrations/connectors/source-twilio/source_twilio/spec.json +++ b/airbyte-integrations/connectors/source-twilio/source_twilio/spec.json @@ -4,30 +4,51 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Twilio Spec", "type": "object", - "required": ["account_sid", "auth_token", "start_date"], + "required": [ + "account_sid", + "auth_token", + "start_date" + ], "additionalProperties": false, "properties": { "account_sid": { "title": "Account ID", "description": "Twilio account SID", "airbyte_secret": true, - "type": "string" + "type": "string", + "order": 1 }, "auth_token": { "title": "Auth Token", "description": "Twilio Auth Token.", "airbyte_secret": true, - "type": "string" + "type": "string", + "order": 2 }, "start_date": { "title": "Replication Start Date", "description": "UTC date and time in the format 2020-10-01T00:00:00Z. Any data before this date will not be replicated.", "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", - "examples": ["2020-10-01T00:00:00Z"], - "type": "string" + "examples": [ + "2020-10-01T00:00:00Z" + ], + "type": "string", + "order": 3 + }, + "lookback_window": { + "title": "Lookback window", + "description": "How far into the past to look for records. (in minutes)", + "examples": [ + 60 + ], + "default": 0, + "type": "integer", + "order": 4 } } }, "supportsIncremental": true, - "supported_destination_sync_modes": ["append"] + "supported_destination_sync_modes": [ + "append" + ] } diff --git a/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py b/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py index 813055eb4345f..e497c37c11224 100644 --- a/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py +++ b/airbyte-integrations/connectors/source-twilio/source_twilio/streams.py @@ -9,6 +9,7 @@ import pendulum import requests from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams import IncrementalMixin from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer @@ -20,9 +21,12 @@ class TwilioStream(HttpStream, ABC): url_base = TWILIO_API_URL_BASE primary_key = "sid" - page_size = 100 + page_size = 1000 transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization | TransformConfig.CustomSchemaNormalization) + def __init__(self, **kwargs): + super().__init__(**kwargs) + @property def data_field(self): return self.name @@ -92,13 +96,15 @@ def custom_transform_function(original_value: Any, field_schema: Mapping[str, An return original_value -class IncrementalTwilioStream(TwilioStream, ABC): +class IncrementalTwilioStream(TwilioStream, IncrementalMixin): cursor_field = "date_updated" time_filter_template = "%Y-%m-%dT%H:%M:%SZ" - def __init__(self, start_date: str = None, **kwargs): + def __init__(self, start_date: str = None, lookback_window: int = 0, **kwargs): super().__init__(**kwargs) self._start_date = start_date + self._lookback_window = lookback_window + self._cursor_value = None @property @abstractmethod @@ -107,28 +113,28 @@ def incremental_filter_field(self) -> str: return: date filter query parameter name """ - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: - """ - Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object - and returning an updated state object. - """ - latest_benchmark = pendulum.parse(latest_record[self.cursor_field], strict=False).strftime(self.time_filter_template) - if current_stream_state.get(self.cursor_field): - return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])} - return {self.cursor_field: latest_benchmark} + @property + def state(self) -> Mapping[str, Any]: + if self._cursor_value: + return {self.cursor_field: self._cursor_value} + else: + return {self.cursor_field: self._start_date} + + @state.setter + def state(self, value: Mapping[str, Any]): + self._cursor_value = value[self.cursor_field] def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: params = super().request_params(stream_state=stream_state, **kwargs) - start_date = stream_state.get(self.cursor_field) or self._start_date + start_date = stream_state[self.cursor_field] if stream_state.get(self.cursor_field) else self._start_date if start_date: - params.update({self.incremental_filter_field: pendulum.parse(start_date, strict=False).strftime(self.time_filter_template)}) + start_date_with_lookback_window = pendulum.parse(start_date, strict=False) - pendulum.duration(minutes=self._lookback_window) + params[self.incremental_filter_field] = start_date_with_lookback_window.strftime(self.time_filter_template) return params - def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs): - stream_state = stream_state or {} - records = super().read_records(stream_state=stream_state, **kwargs) - for record in records: - record[self.cursor_field] = pendulum.parse(record[self.cursor_field], strict=False).strftime(self.time_filter_template) + def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]: + for record in super().read_records(*args, **kwargs): + self._cursor_value = pendulum.parse(record[self.cursor_field], strict=False) yield record @@ -345,6 +351,24 @@ class MessageMedia(TwilioNestedStream, IncrementalTwilioStream): incremental_filter_field = "DateCreated>" cursor_field = "date_created" + def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + stream_instance = self.parent_stream( + authenticator=self.authenticator, start_date=self._start_date, lookback_window=self._lookback_window + ) + stream_slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=stream_instance.cursor_field) + for stream_slice in stream_slices: + for item in stream_instance.read_records( + sync_mode=SyncMode.full_refresh, stream_slice=stream_slice, cursor_field=stream_instance.cursor_field + ): + if item.get("subresource_uris", {}).get(self.subresource_uri_key): + validated = True + for key, value in self.media_exist_validation.items(): + validated = item.get(key) and item.get(key) != value + if not validated: + break + if validated: + yield {"subresource_uri": item["subresource_uris"][self.subresource_uri_key]} + class UsageNestedStream(TwilioNestedStream): url_base = TWILIO_API_URL_BASE_VERSIONED diff --git a/docs/integrations/sources/twilio.md b/docs/integrations/sources/twilio.md index 2b1051007b4be..a743e1035f765 100644 --- a/docs/integrations/sources/twilio.md +++ b/docs/integrations/sources/twilio.md @@ -66,6 +66,8 @@ See [docs](https://www.twilio.com/docs/iam/api) for more details. | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.5 | 2022-05-17 | [12555](https://github.com/airbytehq/airbyte/pull/12555) | Add page size and lookback window parameters to fetch messages with a rolling window and catch status updates | +| 0.1.4 | 2022-04-22 | [12157](https://github.com/airbytehq/airbyte/pull/12157) | Use Retry-After header for backoff | | 0.1.3 | 2022-04-20 | [12183](https://github.com/airbytehq/airbyte/pull/12183) | Add new subresource on the call stream + declare a valid primary key for conference_participants stream | | 0.1.2 | 2021-12-23 | [9092](https://github.com/airbytehq/airbyte/pull/9092) | Correct specification doc URL | | 0.1.1 | 2021-10-18 | [7034](https://github.com/airbytehq/airbyte/pull/7034) | Update schemas and transform data types according to the API schema |