Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airbyte CDK: Improve error for returning non-iterable from connectors parse_response #17047

Closed
Gitznik opened this issue Sep 22, 2022 · 6 comments · Fixed by #17626
Closed

Airbyte CDK: Improve error for returning non-iterable from connectors parse_response #17047

Gitznik opened this issue Sep 22, 2022 · 6 comments · Fixed by #17626
Labels
autoteam CDK Connector Development Kit community Stale team/tse Technical Support Engineers team/use type/bug Something isn't working

Comments

@Gitznik
Copy link
Contributor

Gitznik commented Sep 22, 2022

Environment

  • Airbyte version: 0.40.7
  • OS Version / Instance: macOS
  • Deployment: Docker
  • Source Connector and version: Custom python HTTP source
  • Step where error happened: Running python3 main.py read while developing the connector

Current Behavior

If the parse_response method of the stream does not return an iterator, this leads to a fairly cryptic pydantic error message, instead of specyfing that an iterator is expected. As response.json() is type hinted with Any, my IDE did not catch this type violation for me and I was quite confused.

Expected Behavior

The error should tell me that the iterator returned from my parse_response function is incompatible with the expected data. If there is no use case for returning a dict as an iterator, it could even raise a warning if the parse_response returns a dict.

Logs

{"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceNativex"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: campaigns "}}
{"type": "LOG", "log": {"level": "ERROR", "message": "Encountered an exception while reading stream campaigns\nTraceback (most recent call last):\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 113, in read\n    yield from self._read_stream(\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 182, in _read_stream\n    for record in record_iterator:\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 282, in _read_full_refresh\n    yield self._as_airbyte_record(configured_stream.stream.name, record)\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 317, in _as_airbyte_record\n    message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)\n  File \"pydantic/main.py\", line 341, in pydantic.main.BaseModel.__init__\npydantic.error_wrappers.ValidationError: 1 validation error for AirbyteRecordMessage\ndata\n  value is not a valid dict (type=type_error.dict)"}}
{"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing campaigns"}}
{"type": "LOG", "log": {"level": "INFO", "message": "SourceNativex runtimes:\nSyncing stream campaigns 0:00:01.377093"}}
{"type": "LOG", "log": {"level": "FATAL", "message": "1 validation error for AirbyteRecordMessage\ndata\n  value is not a valid dict (type=type_error.dict)\nTraceback (most recent call last):\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/main.py\", line 13, in <module>\n    launch(source, sys.argv[1:])\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 123, in launch\n    for message in source_entrypoint.run(parsed_args):\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 114, in run\n    for message in generator:\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 127, in read\n    raise e\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 113, in read\n    yield from self._read_stream(\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 182, in _read_stream\n    for record in record_iterator:\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 282, in _read_full_refresh\n    yield self._as_airbyte_record(configured_stream.stream.name, record)\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 317, in _as_airbyte_record\n    message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)\n  File \"pydantic/main.py\", line 341, in pydantic.main.BaseModel.__init__\npydantic.error_wrappers.ValidationError: 1 validation error for AirbyteRecordMessage\ndata\n  value is not a valid dict (type=type_error.dict)"}}
{"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": 1663859942510.5889, "error": {"message": "Something went wrong in the connector. See the logs for more details.", "internal_message": "1 validation error for AirbyteRecordMessage\ndata\n  value is not a valid dict (type=type_error.dict)", "stack_trace": "Traceback (most recent call last):\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/main.py\", line 13, in <module>\n    launch(source, sys.argv[1:])\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 123, in launch\n    for message in source_entrypoint.run(parsed_args):\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 114, in run\n    for message in generator:\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 127, in read\n    raise e\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 113, in read\n    yield from self._read_stream(\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 182, in _read_stream\n    for record in record_iterator:\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 282, in _read_full_refresh\n    yield self._as_airbyte_record(configured_stream.stream.name, record)\n  File \"/Users/.../airbyte_trial/airbyte-integrations/connectors/source-nativex/.venv/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 317, in _as_airbyte_record\n    message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)\n  File \"pydantic/main.py\", line 341, in pydantic.main.BaseModel.__init__\npydantic.error_wrappers.ValidationError: 1 validation error for AirbyteRecordMessage\ndata\n  value is not a valid dict (type=type_error.dict)\n", "failure_type": "system_error"}}}

Steps to Reproduce

  1. Return a dict from a connectors parse_response method
  2. Call python3 main.py read --config secrets/config.json --catalog catalog.json on it

Are you willing to submit a PR?

Sure. From what I can see the method is called from

yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)

Would it make sense to do a type assert here?

@Gitznik Gitznik added needs-triage type/bug Something isn't working labels Sep 22, 2022
@marcosmarxm marcosmarxm added CDK Connector Development Kit and removed needs-triage team/tse Technical Support Engineers autoteam labels Sep 23, 2022
@marcosmarxm marcosmarxm changed the title Improve error for returning non-iterable from connectors parse_response Airbyte CDK: Improve error for returning non-iterable from connectors parse_response Sep 23, 2022
@marcosmarxm
Copy link
Member

Thanks for suggesting this, you can submit the change. The question is how do you will assert the output of yield?

@octavia-squidington-iii octavia-squidington-iii added autoteam team/tse Technical Support Engineers labels Sep 23, 2022
@Gitznik
Copy link
Contributor Author

Gitznik commented Sep 23, 2022

Good question. Feels quite tricky to do it in a robust way here.
Going down the stack trace, we could catch the pydantic.error_wrappers.ValidationError error caused in

message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
when passing anything that is not a dict and raise a custom error with some additional information, e.g. this likely cause?
I imagine something like:

try:
    message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
except pydantic.error_wrappers.ValidationError as e:
    if "value is not a valid dict" in str(e) :
        raise TypeError("Should return an iterator of dics from parse response") from e
    raise e

@marcosmarxm
Copy link
Member

I think it is a good idea @Gitznik do you mind submitting a PR then I'll ask the CDK team to review it?

girarda added a commit that referenced this issue Oct 6, 2022
…nectors parse_response (#17626)

* Improve airbyte cdk invalid message data type error message

* Test cdk invalid message data type custom error is raised

* Fix test to pass stream as a string

* Add valid record message data input type test

* Add object type and value to AirbyteRecordMessage validator message

Co-authored-by: Alexandre Girard <alexandre@airbyte.io>
girarda added a commit that referenced this issue Oct 7, 2022
…from connectors parse_response (#17626)"

This reverts commit d9ad272.
@girarda girarda reopened this Oct 7, 2022
@girarda
Copy link
Contributor

girarda commented Oct 7, 2022

@Gitznik FYI I ended up having to revert #17047 because the airbyte_protocol.py file is autogenerated by our build system. Will need to look for a better way to run this validation

girarda added a commit that referenced this issue Oct 7, 2022
…from connectors parse_response (#17707)

* Bump cdk version

* Revert "#17047 Airbyte CDK: Improve error for returning non-iterable from connectors parse_response (#17626)"

This reverts commit d9ad272.

* Bump
letiescanciano added a commit that referenced this issue Oct 7, 2022
…vation

* master: (32 commits)
  fixed octavia position and z-index on onboarding page (#17708)
  Revert "Revert "Do not wait the end of a reset to return an update (#17591)" (#17640)" (#17669)
  source-google-analytics-v4: use hits metric for check (#17717)
  Source linkedin-ads: retry 429/5xx when refreshing access token (#17724)
  🐛 Source Mixpanel: solve cursor field none expected array (#17699)
  🎉 8890 Source MySql: Fix large table issue by fetch size (#17236)
  Test e2e testing tool commands (#17722)
  fixed escape character i18n error (#17706)
  Docs: adds missing " in transformations-with-airbyte.md (#17723)
  Change Osano token to new project (#17720)
  Source Github: improve 502 handling for `comments` stream (#17715)
  #17506 source snapchat marketing: retry failed request for refreshing access token (#17596)
  MongoDb Source: Increase performance of discover (#17614)
  Testing tool commands for run scenarios (#17550)
  Kustomize: Missing NORMALIZATION_JOB_* environment variables in stable-with-resource-limits overlays (#17713)
  Fix console errors (#17696)
  Revert: #17047 Airbyte CDK: Improve error for returning non-iterable from connectors parse_response (#17707)
  #17047 Airbyte CDK: Improve error for returning non-iterable from connectors parse_response (#17626)
  📝 Postgres source: document occasional full refresh under cdc mode (#17705)
  Bump Airbyte version from 0.40.12 to 0.40.13 (#17682)
  ...
jhammarstedt pushed a commit to jhammarstedt/airbyte that referenced this issue Oct 31, 2022
… from connectors parse_response (airbytehq#17626)

* Improve airbyte cdk invalid message data type error message

* Test cdk invalid message data type custom error is raised

* Fix test to pass stream as a string

* Add valid record message data input type test

* Add object type and value to AirbyteRecordMessage validator message

Co-authored-by: Alexandre Girard <alexandre@airbyte.io>
jhammarstedt pushed a commit to jhammarstedt/airbyte that referenced this issue Oct 31, 2022
…iterable from connectors parse_response (airbytehq#17707)

* Bump cdk version

* Revert "airbytehq#17047 Airbyte CDK: Improve error for returning non-iterable from connectors parse_response (airbytehq#17626)"

This reverts commit d9ad272.

* Bump
@octavia-squidington-iii
Copy link
Collaborator

At Airbyte, we seek to be clear about the project priorities and roadmap. This issue has not had any activity for 180 days, suggesting that it's not as critical as others. It's possible it has already been fixed. It is being marked as stale and will be closed in 20 days if there is no activity. To keep it open, please comment to let us know why it is important to you and if it is still reproducible on recent versions of Airbyte.

@octavia-squidington-iii
Copy link
Collaborator

This issue was closed because it has been inactive for 20 days since being marked as stale.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
autoteam CDK Connector Development Kit community Stale team/tse Technical Support Engineers team/use type/bug Something isn't working
Projects
None yet
4 participants