Skip to content

Commit

Permalink
fix query errors
Browse files Browse the repository at this point in the history
  • Loading branch information
sehnem committed Sep 11, 2023
1 parent efdcce5 commit 47d31e2
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 31 deletions.
37 changes: 37 additions & 0 deletions tap_shopify/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
from functools import cached_property
from inspect import stack
from typing import Any, Optional
import requests

from singer_sdk.exceptions import FatalAPIError, RetriableAPIError

from http import HTTPStatus

from singer_sdk import typing as th
from singer_sdk.pagination import SinglePagePaginator
Expand Down Expand Up @@ -212,3 +217,35 @@ def denest_schema(schema):
return output

return denest_schema(catalog)


def validate_response(self, response: requests.Response) -> None:
"""Validate HTTP response."""

if (
response.status_code in self.extra_retry_statuses
or HTTPStatus.INTERNAL_SERVER_ERROR
<= response.status_code
<= max(HTTPStatus)
):
msg = self.response_error_message(response)
raise RetriableAPIError(msg, response)

json_resp = response.json()

if errors:=json_resp.get("errors"):
if len(errors)==1:
error = errors[0]
code = error.get("extensions", {}).get("code")
if code in ["THROTTLED", "MAX_COST_EXCEEDED"]:
raise RetriableAPIError(error.get("message", ""), response)
raise FatalAPIError(error.get("message", ""))
raise RetriableAPIError(json_resp["errors"], response)

if (
HTTPStatus.BAD_REQUEST
<= response.status_code
< HTTPStatus.INTERNAL_SERVER_ERROR
):
msg = self.response_error_message(response)
raise FatalAPIError(msg)
21 changes: 9 additions & 12 deletions tap_shopify/client_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

class shopifyBulkStream(ShopifyStream):
"""shopify stream class."""

# @cached_property
def query(self) -> str:
"""Set or return the GraphQL query string."""
if self.name == "shop":
Expand All @@ -27,16 +25,11 @@ def query(self) -> str:

query = base_query.replace("__query_name__", self.query_name)
query = query.replace("__selected_fields__", self.gql_selected_fields)
query = query.replace("__filters__", self.filters)
filters = f"({self.filters})" if self.filters else ""
query = query.replace("__filters__", filters)

return query

# def get_next_page_token(
# self, response: requests.Response, previous_token: Optional[Any]
# ) -> Any:
# """Return token identifying next page or None if all records have been read."""
# return None

@property
def filters(self):
"""Return a dictionary of values to be used in URL parameterization."""
Expand Down Expand Up @@ -97,14 +90,18 @@ def check_status(self, operation_id, sleep_time=10, timeout=1800):
def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result rows."""
operation_id_jsonpath = "$.data.bulkOperationRunQuery.bulkOperation.id"
request_response = response.json()
error_jsonpath = "$.data.bulkOperationRunQuery.userErrors"
json_resp = response.json()
errors = next(extract_jsonpath(error_jsonpath, json_resp), None)
if errors:
raise InvalidOperation(simplejson.dumps(errors))
operation_id = next(
extract_jsonpath(operation_id_jsonpath, input=request_response)
extract_jsonpath(operation_id_jsonpath, json_resp)
)

url = self.check_status(operation_id)

output = requests.get(url, stream=True)
output = requests.get(url, stream=True, timeout=30)

for line in output.iter_lines():
yield simplejson.loads(line)
7 changes: 2 additions & 5 deletions tap_shopify/client_gql.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]:
json_path = f"$.data.{self.query_name}.edges[*].node"
else:
json_path = f"$.data.{self.query_name}"
response = response.json()
json_resp = response.json()

if response.get("errors"):
raise Exception(response["errors"])

yield from extract_jsonpath(json_path, input=response)
yield from extract_jsonpath(json_path, json_resp)
2 changes: 1 addition & 1 deletion tap_shopify/gql_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
bulkOperationRunQuery(
query:"""
{
__query_name__(__filters__) {
__query_name____filters__ {
edges {
node {
__selected_fields__
Expand Down
2 changes: 1 addition & 1 deletion tap_shopify/paginator.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def page_size(self) -> int:
elif self.query_cost and pages > 5:
if self.query_cost * pages >= 1000:
pages = math.floor(1000 / self.query_cost)
return 250 if pages > 250 else pages
return 250 if pages > 250 else int(pages)

def query_name(self, response_json) -> str:
"""Set or return the GraphQL query name."""
Expand Down
31 changes: 19 additions & 12 deletions tap_shopify/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from functools import cached_property
from tap_shopify.gql_queries import schema_query, queries_query
from typing import Any, Iterable
from singer_sdk.helpers.jsonpath import extract_jsonpath as jp

import requests
import inflection
Expand Down Expand Up @@ -58,7 +59,7 @@ class TapShopify(Tap):
th.Property(
"api_version",
th.StringType,
default="2023-04",
default="2023-10",
description="The version of the API to use.",
),
th.Property(
Expand Down Expand Up @@ -96,6 +97,7 @@ def request_gql(self, query: str) -> requests.Response:
url=url,
headers=headers,
json=request_data,
timeout=30,
)

resp.raise_for_status()
Expand All @@ -106,7 +108,9 @@ def request_gql(self, query: str) -> requests.Response:
def schema_gql(self) -> dict:
"""Return the schema for the stream."""
resp = self.request_gql(schema_query)
return resp.json()["data"]["__schema"]["types"]
json_resp = resp.json()
jsonpath = "$.data.__schema.types[*]"
return list(jp(jsonpath, json_resp))

def filter_queries(self, query):
args = [a["name"] for a in query["args"]]
Expand All @@ -117,26 +121,29 @@ def queries_gql(self) -> dict:
"""Return the schema for the stream."""

resp = self.request_gql(queries_query)
jresp = resp.json()
queries = jresp["data"]["__schema"]["queryType"]["fields"]
json_resp = resp.json()
jsonpath = "$.data.__schema.queryType.fields[*]"
queries = jp(jsonpath, json_resp)
return [q for q in queries if self.filter_queries(q)]

def extract_gql_node(self, query: dict) -> dict:
query_fields = query["type"]["ofType"]["fields"]
jsonpath = "$.type.ofType.fields[*]"
query_fields = jp(jsonpath, query)
return next((f for f in query_fields if f["name"] == "nodes"), None)

def get_gql_query_type(self, node: dict) -> str:
return node["type"]["ofType"]["ofType"]["ofType"]["name"]
jsonpath = "$.type.ofType.ofType.ofType.name"
return next(jp(jsonpath, node), None)

def get_type_fields(self, gql_type: str) -> list[dict]:
type_def = next(s for s in self.schema_gql if s["name"] == gql_type)

filtered_fields = [
f
for f in type_def["fields"]
if f["type"]["kind"] == "NON_NULL"
and f["type"]["ofType"]["kind"] == "SCALAR"
]
filtered_fields = []
for field in type_def["fields"]:
type_kind = next(jp("type.kind", field), None)
field_kind = next(jp("type.ofType.kind", field), None)
if type_kind == "NON_NULL" and field_kind == "SCALAR":
filtered_fields.append(field)

return {f["name"]: f["type"]["ofType"] for f in filtered_fields}

Expand Down

0 comments on commit 47d31e2

Please sign in to comment.