Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

πŸ› Source Qualaroo: Fix start_date & custom survey_ids #13121

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@
- name: Qualaroo
sourceDefinitionId: b08e4776-d1de-4e80-ab5c-1e51dad934a2
dockerRepository: airbyte/source-qualaroo
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.io/integrations/sources/qualaroo
icon: qualaroo.svg
sourceType: api
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6796,7 +6796,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-qualaroo:0.1.1"
- dockerImage: "airbyte/source-qualaroo:0.1.2"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/qualaroo"
connectionSpecification:
Expand Down Expand Up @@ -6833,7 +6833,7 @@
type: "array"
items:
type: "string"
pattern: "^[0-9a-fA-F]{24}$"
pattern: "^[0-9]{1,8}$"
title: "Qualaroo survey IDs"
description: "IDs of the surveys from which you'd like to replicate data.\
\ If left empty, data from all surveys to which you have access will be\
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-qualaroo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ COPY source_qualaroo ./source_qualaroo
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-qualaroo
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,45 @@
"type": ["null", "integer"]
},
"time": {
"type": ["null", "string"],
"type": ["null", "string"],
"format": "date-time"
},
"emitted_at": {
"type": ["null", "string"],
"format": "date-time"
},
"identity": {
"type": ["null", "string"]
},
"page": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"referrer": {
"type": ["null", "string"]
},
"user_agent": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"nudge_id": {
"type": ["null", "integer"]
},
"nudge_name": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"anon_visitor_id": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"ip_address": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"answered_questions": {
"type": "array",
"items": {
"type":["null", "object"]
"type": ["null", "object"]
}
},
"properties": {
"type":["null", "object"]
"type": ["null", "object"]
},
"nps": {
"type": ["null", "object"],
Expand All @@ -48,20 +52,20 @@
"type": ["null", "integer"]
},
"reason": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"respondent_id": {
"type": ["null", "integer"]
},
"time": {
"type": ["null", "string"],
"type": ["null", "string"],
"format": "date-time"
},
"category": {
"type": ["null", "string"]
"type": ["null", "string"]
},
"response_uri": {
"type": ["null", "string"]
"type": ["null", "string"]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,97 +3,16 @@
#


from abc import ABC
from base64 import b64encode
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from typing import Any, List, Mapping, Tuple

import pendulum
import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator


class QualarooStream(HttpStream, ABC):
url_base = "https://api.qualaroo.com/api/v1/"

# Define primary key as sort key for full_refresh, or very first sync for incremental_refresh
primary_key = "id"

# Page size
limit = 500

extra_params = None

def __init__(self, config: Mapping[str, Any]):
super().__init__(authenticator=config["authenticator"])
self.start_date = config["start_date"]
self.config = config

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = {"limit": self.limit, "start_date": self.start_date}
if next_page_token:
params.update(**next_page_token)
if self.extra_params:
params.update(self.extra_params)
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
json_response = response.json()
for record in json_response:
yield record


class ChildStreamMixin:
parent_stream_class: Optional[QualarooStream] = None

def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
for item in self.parent_stream_class(config=self.config).read_records(sync_mode=sync_mode):
yield {"id": item["id"]}


class Surveys(QualarooStream):
"""Return list of all Surveys.
API Docs: https://help.qualaroo.com/hc/en-us/articles/201969438-The-REST-Reporting-API
Endpoint: https://api.qualaroo.com/api/v1/nudges/
"""

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return "nudges"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
survey_ids = self.config.get("survey_ids", [])
for record in super().parse_response(response, **kwargs):
if not survey_ids or record["id"] in survey_ids:
yield record


class Responses(ChildStreamMixin, QualarooStream):
"""Return list of all responses of a survey.
API Docs: hhttps://help.qualaroo.com/hc/en-us/articles/201969438-The-REST-Reporting-API
Endpoint: https://api.qualaroo.com/api/v1/nudges/<id>/responses.json
"""

parent_stream_class = Surveys
limit = 500
extra_params = {}

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json()

# de-nest the answered_questions object if exists
for rec in response_data:
if "answered_questions" in rec:
rec["answered_questions"] = list(rec["answered_questions"].values())
yield from response_data

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return f"nudges/{stream_slice['id']}/responses.json"
from .streams import QualarooStream, Responses, Surveys


class QualarooAuthenticator(HttpAuthenticator):
Expand All @@ -110,7 +29,6 @@ def __init__(
token_header: str = "oauth_token",
):
self._key = key
self._token = token
self._token = b64encode(b":".join((key.encode("latin1"), token.encode("latin1")))).strip().decode("ascii")
self.auth_header = auth_header
self.key_header = key_header
Expand Down Expand Up @@ -155,5 +73,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
:param config: A Mapping of the user input configuration as defined in the connector spec.
"""
config["authenticator"] = self._get_authenticator(config)
return [Surveys(config), Responses(config)]
args = {}
# convert start_date to epoch time for qualaroo API
args["start_date"] = pendulum.parse(config["start_date"]).strftime("%s")
args["survey_ids"] = config.get("survey_ids", [])
args["authenticator"] = self._get_authenticator(config)
return [Surveys(**args), Responses(**args)]
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"type": "array",
"items": {
"type": "string",
"pattern": "^[0-9a-fA-F]{24}$"
"pattern": "^[0-9]{1,8}$"
},
"title": "Qualaroo survey IDs",
"description": "IDs of the surveys from which you'd like to replicate data. If left empty, data from all surveys to which you have access will be replicated."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from abc import ABC
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream


class QualarooStream(HttpStream, ABC):
url_base = "https://api.qualaroo.com/api/v1/"

# Define primary key as sort key for full_refresh, or very first sync for incremental_refresh
primary_key = "id"

# Page size
limit = 500

extra_params = None

def __init__(self, start_date: pendulum.datetime, survey_ids: List[str] = [], **kwargs):
super().__init__(**kwargs)
self._start_date = start_date
self._survey_ids = survey_ids
self._offset = 0

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
resp_json = response.json()

if len(resp_json) == 500:
self._offset += 500
return {"offset": self._offset}

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = {"limit": self.limit, "start_date": self._start_date}
if next_page_token:
params.update(**next_page_token)
if self.extra_params:
params.update(self.extra_params)
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
json_response = response.json()
for record in json_response:
yield record


class ChildStreamMixin:
parent_stream_class: Optional[QualarooStream] = None

def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
for item in self.parent_stream_class(config=self.config).read_records(sync_mode=sync_mode):
yield {"id": item["id"]}


class Surveys(QualarooStream):
"""Return list of all Surveys.
API Docs: https://help.qualaroo.com/hc/en-us/articles/201969438-The-REST-Reporting-API
Endpoint: https://api.qualaroo.com/api/v1/nudges/
"""

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
return "nudges"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
survey_ids = self._survey_ids
result = super().parse_response(response=response, **kwargs)
for record in result:
if not survey_ids or str(record["id"]) in survey_ids:
yield record


class Responses(ChildStreamMixin, QualarooStream):
"""Return list of all responses of a survey.
API Docs: hhttps://help.qualaroo.com/hc/en-us/articles/201969438-The-REST-Reporting-API
Endpoint: https://api.qualaroo.com/api/v1/nudges/<id>/responses.json
"""

parent_stream_class = Surveys

def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
survey_id = stream_slice["survey_id"]
return f"nudges/{survey_id}/responses.json"

def stream_slices(self, **kwargs):
survey_stream = Surveys(start_date=self._start_date, survey_ids=self._survey_ids, authenticator=self.authenticator)
for survey in survey_stream.read_records(sync_mode=SyncMode.full_refresh):
yield {"survey_id": survey["id"]}

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json()
# de-nest the answered_questions object if exists
for rec in response_data:
if "answered_questions" in rec:
rec["answered_questions"] = list(rec["answered_questions"].values())
yield from response_data
Loading