Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spike source emit request and response #17839

Closed
wants to merge 1 commit into from

Conversation

girarda
Copy link
Contributor

@girarda girarda commented Oct 11, 2022

What

The connector builder requires access to the requests and responses that produced a given set of records.
This functionality is not currently implemented in the CDK.
Its implementation requires modifying the CDK at multiple layer because the Source and Stream classes are not aware of HTTP requests and responses.

How

Intercepting the requests and responses

The HttpStream class already logs the HTTP requests sent to the API as well as the HTTP responses we receive.
Instead of directly logging the requests and responses, the class can implement 2 new methods

  1. log_request
  2. log_response
    By default, these methods will log the requests/responses like the HttpStream currently does.

Exposing the requests/responses to the Source

The SimpleRetriever class can then overwrite these methods to keep track of them in memory:

def log_request(self, request: requests.PreparedRequest):
    self._request = {"url": request.url, "body": request.body, "headers": dict(request.headers)}

Before returning the parsed records from parse_response, the SimpleRetriever can amend the records with metadata representing the request and response that produced the record

for r in records:
    r["_ab_response"] = response.content
    r["_ab_request"] = self._request
return records

Exposing the requests/responses as AirbyteMessages

Since the AirbyteRecords produced by the AbstractSource now contain metadata, the DeclarativeSource can extract the metadata from the records and emit the requests and responses as their own AirbyteMessage.

def read(
    self,
    logger: logging.Logger,
    config: Mapping[str, Any],
    catalog: ConfiguredAirbyteCatalog,
    state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
) -> Iterator[AirbyteMessage]:
    last_request = None
    for airbyte_message in super().read(logger, config, catalog, state):
        if airbyte_message.record and airbyte_message.record.data["_ab_request"] != last_request:
            yield AirbyteMessage(
                type=MessageType.LOG, log=AirbyteLogMessage(message=str(airbyte_message.record.data["_ab_request"]), level=Level.INFO)
            )
            yield AirbyteMessage(
                type=MessageType.LOG, log=AirbyteLogMessage(message=str(airbyte_message.record.data["_ab_response"]), level=Level.INFO)
            )
        last_request = airbyte_message.record.data.pop("_ab_request")
        airbyte_message.record.data.pop("_ab_response")
        yield airbyte_message

Exposing the requests/responses to the UI

The backend server that will run the DeclarativeSource can then extract the AirbyteRecords and group them as needed for the connector builder.

Additional decisions to make

  • What type of AirbyteMessage should contain the request/response?
  • Should there be a single metadata field added to the record's data mapping or one for request and one for response?
  • Are there any concerns with the SimpleRetriever amending the data produced? It tightly couples it with the DeclarativeSource, but I don't see them being used separately anyway
  • Should the additional logic live in the existing classes, or should we create wrapper classes?

@github-actions github-actions bot added the CDK Connector Development Kit label Oct 11, 2022
@girarda girarda changed the title spike spike source emit request and response Oct 11, 2022
) -> Iterator[AirbyteMessage]:
last_request = None
for airbyte_message in super().read(logger, config, catalog, state):
if airbyte_message.record and airbyte_message.record.data["_ab_request"] != last_request:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complexity: also need to check if we're processing a new stream slice

@girarda girarda closed this Jul 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant