Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Authentication for retries fixed, schema issues fixed #14

Merged
merged 4 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions meltano.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ plugins:
- name: client_secret
sensitive: true
- name: clinic_id
select:
- opportunity_timeline.*
loaders:
- name: target-jsonl
variant: andyh1203
Expand Down
43 changes: 16 additions & 27 deletions tap_sunwave/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ class SunwaveStream(RESTStream):
"""Sunwave stream class."""

url_base = "https://emr.sunwavehealth.com/SunwaveEMR"
auth_errors = 0
@property
def schema_filepath(self):
return SCHEMAS_DIR / f"{self.name}.json"

def _request(self, prepared_request: requests.PreparedRequest, context: Context | None) -> requests.Response:
"""Auth wasn't getting ran for every retried request, so we need to do it here"""
# Manually run authenticator before sending the request
reauthed_request = self.authenticator.authenticate_request(prepared_request)
# Then call the parent's method with our newly authenticated request
return super()._request(reauthed_request, context)

@cached_property
def authenticator(self) -> Auth:
Expand Down Expand Up @@ -71,17 +82,14 @@ def validate_response(self, response: requests.Response) -> None:
# Check if response is a dict and has error
if isinstance(json_response, dict) and json_response.get("error"):
error_msg = self.response_error_message(response)
# TODO: figure out how to handle this better
if self.name == "opportunity_timeline":
log_msg = f"Skipping 'opportunity_timeline' record due to {error_msg}"
self.logger.info(log_msg)
else:
raise FatalAPIError(error_msg)
raise FatalAPIError(error_msg)
except requests.exceptions.JSONDecodeError as e:
# Their API returns a 200 status code when there's an error
# We detect that by noticing the response isn't valid JSON
msg self.response_error_message(response)
msg = self.response_error_message(response)
raise FatalAPIError(msg)
# Reset auth errors
self.auth_errors = 0

def _cleanup_schema(self, schema_fragment):
if isinstance(schema_fragment, dict):
Expand All @@ -100,23 +108,4 @@ def _cleanup_schema(self, schema_fragment):

elif isinstance(schema_fragment, list):
for item in schema_fragment:
self._cleanup_schema(item)

def _get_swagger_schema(self, ref: str) -> dict:
"""Get schema from packaged swagger file."""
# Use package resources to access swagger.json
with resources.files(__package__).joinpath("docs/swagger.json").open("r", encoding="utf-8") as f:
swagger_doc = json.load(f)

# Example: ref => "#/components/schemas/FormStandardResponse"
# Strip the leading "#/" and split by "/"
path_parts = ref.lstrip("#/").split("/")

# Traverse the swagger doc to get the nested object
data = swagger_doc
for part in path_parts:
data = data[part]

cleaned_schema = self._cleanup_schema(data["properties"])
assert len(data["properties"]) > 1, "No properties found in schema"
return data
self._cleanup_schema(item)
Loading
Loading