Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[per-stream cdk] Transform incoming LEGACY and per-stream state JSON to AirbyteStateMessage #15977

Closed
Tracked by #16054
sherifnada opened this issue Aug 25, 2022 · 1 comment
Assignees

Comments

@sherifnada
Copy link
Contributor

sherifnada commented Aug 25, 2022

During a sync, connectors receive stream state from the platform by a reference to input_state.json. This JSON blob contains the state of all streams in the following LEGACY format:

{
  "users": {
    "created_at": 12345
  },
  "accounts": {
    "id": "abc_123"
  }
}

For connections supporting STREAM state, the platform will begin persisting state and passing it to a connector as:

[
  { "type": "stream", "stream": { "stream_descriptor": { "name": "users" }, "state": { "id": 12345 } },
  { "type": "stream", "stream": { "stream_descriptor": { "name": "accounts" }, "state": { "id": "abc_123" } }
]

When we read the JSON file in entrypoint.py / source.py, we should accept either format and parse it into a list of AirbyteStateMessage(s) which will then be interpreted by abstract_source.py later while processing an incremental sync.

After the changes:
LEGACY state should be parsed into a list of one AirbyteStateMessage:

[
  { 
    "type": "LEGACY", 
    "data": {
      "users": {
        "created_at": 12345
      },
      "accounts": {
        "id": "abc_123"
      }
    }
  }
]

STREAM/GLOBAL state should be parsed into a list in the same format that in came in as.

This ticket will also involve defining a class ConnectorState which will help make future tickets more parallelizable.

    def __init__(self, shared_state: AirbyteStateBlob = None, streams: Mapping[StreamDescriptor, AirbyteStateBlob] = None, legacy: MutableMapping[str, Any] = None):
        self.shared_state = shared_state
        self.streams = streams
        self.legacy = legacy
@brianjlai brianjlai changed the title Identify if incoming STATE message is legacy or per-stream Transform incoming LEGACY and per-stream state JSON to AirbyteStateMessage Aug 28, 2022
@brianjlai brianjlai changed the title Transform incoming LEGACY and per-stream state JSON to AirbyteStateMessage [per-stream cdk] Transform incoming LEGACY and per-stream state JSON to AirbyteStateMessage Aug 29, 2022
@brianjlai
Copy link
Contributor

grooming notes:

  • should be a no-op on existing behavior

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants