Skip to content

Commit

Permalink
Protocol Change: AirbyteControlMessage.ConnectorConfig (#17907)
Browse files Browse the repository at this point in the history
* Protocol Change: AirbyteConfigMessage

* update PR link in docs

* Lint

* Update python files

* Update docs/understanding-airbyte/airbyte-protocol.md

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Update docs/understanding-airbyte/airbyte-protocol.md

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* `AirbyteConfigMessage` -> `AirbyteConnectorConfigMessage`

* AirbyteOrchestratorMessage

* Update docs

* `AirbyteControlConnectorConfigMessage`

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
  • Loading branch information
2 people authored and girarda committed Oct 28, 2022
1 parent d04a07f commit 0d93cd1
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 1 deletion.
28 changes: 28 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Type(Enum):
CONNECTION_STATUS = "CONNECTION_STATUS"
CATALOG = "CATALOG"
TRACE = "TRACE"
CONTROL = "CONTROL"


class AirbyteRecordMessage(BaseModel):
Expand Down Expand Up @@ -97,6 +98,17 @@ class Config:
failure_type: Optional[FailureType] = Field(None, description="The type of error")


class OrchestratorType(Enum):
CONNECTOR_CONFIG = "CONNECTOR_CONFIG"


class AirbyteControlConnectorConfigMessage(BaseModel):
class Config:
extra = Extra.allow

config: Dict[str, Any] = Field(..., description="the config items from this connector's spec to update")


class Status(Enum):
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
Expand Down Expand Up @@ -203,6 +215,18 @@ class Config:
error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object")


class AirbyteControlMessage(BaseModel):
class Config:
extra = Extra.allow

type: OrchestratorType = Field(..., description="the type of orchestrator message", title="orchestrator type")
emitted_at: float = Field(..., description="the time in ms that the message was emitted")
connectorConfig: Optional[AirbyteControlConnectorConfigMessage] = Field(
None,
description="connector config orchestrator message: the updated config for the platform to store for this connector",
)


class AirbyteStream(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -333,6 +357,10 @@ class Config:
None,
description="trace message: a message to communicate information about the status and performance of a connector",
)
control: Optional[AirbyteControlMessage] = Field(
None,
description="connector config message: a message to communicate an updated configuration from a connector that should be persisted",
)


class AirbyteProtocol(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.FailureReason;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
import io.airbyte.protocol.models.AirbyteControlMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
Expand Down Expand Up @@ -110,6 +112,7 @@ public void acceptFromSource(final AirbyteMessage message) {
case TRACE -> handleEmittedTrace(message.getTrace(), ConnectorType.SOURCE);
case RECORD -> handleSourceEmittedRecord(message.getRecord());
case STATE -> handleSourceEmittedState(message.getState());
case CONTROL -> handleEmittedOrchestratorMessage(message.getControl(), ConnectorType.SOURCE);
default -> log.warn("Invalid message type for message: {}", message);
}
}
Expand All @@ -122,6 +125,7 @@ public void acceptFromDestination(final AirbyteMessage message) {
switch (message.getType()) {
case TRACE -> handleEmittedTrace(message.getTrace(), ConnectorType.DESTINATION);
case STATE -> handleDestinationEmittedState(message.getState());
case CONTROL -> handleEmittedOrchestratorMessage(message.getControl(), ConnectorType.DESTINATION);
default -> log.warn("Invalid message type for message: {}", message);
}
}
Expand Down Expand Up @@ -216,6 +220,30 @@ private void handleDestinationEmittedState(final AirbyteStateMessage stateMessag
}
}

/**
* When a connector signals that the platform should update persist an update
*/
private void handleEmittedOrchestratorMessage(final AirbyteControlMessage controlMessage, final ConnectorType connectorType) {
switch (controlMessage.getType()) {
case CONNECTOR_CONFIG -> handleEmittedOrchestratorConnectorConfig(controlMessage.getConnectorConfig(), connectorType);
default -> log.warn("Invalid orchestrator message type for message: {}", controlMessage);
}
}

/**
* When a connector needs to update its configuration
*/
@SuppressWarnings("PMD") // until method is implemented
private void handleEmittedOrchestratorConnectorConfig(final AirbyteControlConnectorConfigMessage configMessage,
final ConnectorType connectorType) {
// TODO: Update config here
/**
* Pseudocode: for (key in configMessage.getConfig()) { validateIsReallyConfig(key);
* persistConfigChange(connectorType, key, configMessage.getConfig().get(key)); // nuance here for
* secret storage or not. May need to be async over API for replication orchestrator }
*/
}

/**
* When a connector emits a trace message, check the type and call the correct function. If it is an
* error trace message, add it to the list of errorTraceMessages for the connector type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
title: AirbyteProtocol
type: object
description: AirbyteProtocol structs
version: 0.3.0
version: 0.3.1
properties:
airbyte_message:
"$ref": "#/definitions/AirbyteMessage"
Expand All @@ -28,6 +28,7 @@ definitions:
- CONNECTION_STATUS
- CATALOG
- TRACE
- CONTROL
log:
description: "log message: any kind of logging you want the platform to know about."
"$ref": "#/definitions/AirbyteLogMessage"
Expand All @@ -48,6 +49,9 @@ definitions:
trace:
description: "trace message: a message to communicate information about the status and performance of a connector"
"$ref": "#/definitions/AirbyteTraceMessage"
control:
description: "connector config message: a message to communicate an updated configuration from a connector that should be persisted"
"$ref": "#/definitions/AirbyteControlMessage"
AirbyteRecordMessage:
type: object
additionalProperties: true
Expand Down Expand Up @@ -197,6 +201,35 @@ definitions:
enum:
- system_error
- config_error
AirbyteControlMessage:
type: object
additionalProperties: true
required:
- type
- emitted_at
properties:
type:
title: orchestrator type
description: "the type of orchestrator message"
type: string
enum:
- CONNECTOR_CONFIG
emitted_at:
description: "the time in ms that the message was emitted"
type: number
connectorConfig:
description: "connector config orchestrator message: the updated config for the platform to store for this connector"
"$ref": "#/definitions/AirbyteControlConnectorConfigMessage"
AirbyteControlConnectorConfigMessage:
type: object
additionalProperties: true
required:
- config
properties:
config:
description: "the config items from this connector's spec to update"
type: object
additionalProperties: true
AirbyteConnectionStatus:
type: object
description: Airbyte connection status
Expand Down
50 changes: 50 additions & 0 deletions docs/understanding-airbyte/airbyte-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The Airbyte Protocol is versioned independently of the Airbyte Platform, and the

| Version | Date of Change | Pull Request(s) | Subject |
| :------- | :------------- | :------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------- |
| `v0.3.1` | 2022-10-12 | [17907](https://github.com/airbytehq/airbyte/pull/17907) | `AirbyteControlMessage.ConnectorConfig` added |
| `v0.3.0` | 2022-09-09 | [16479](https://github.com/airbytehq/airbyte/pull/16479) | `AirbyteLogMessage.stack_trace` added |
| `v0.2.0` | 2022-06-10 | [13573](https://github.com/airbytehq/airbyte/pull/13573) & [12586](https://github.com/airbytehq/airbyte/pull/12586) | `STREAM` and `GLOBAL` STATE messages |
| `v0.1.1` | 2022-06-06 | [13356](https://github.com/airbytehq/airbyte/pull/13356) | Add a namespace in association with the stream name |
Expand Down Expand Up @@ -803,6 +804,55 @@ AirbyteErrorTraceMessage:
- config_error
```

## AirbyteControlMessage

An `AirbyteControlMessage` is for connectors to signal to the Airbyte Platform or Orchestrator that an action with a side-effect should be taken. This means that the Orchestrator will likely be altering some stored data about the connector, connection, or sync.

```yaml
AirbyteControlMessage:
type: object
additionalProperties: true
required:
- type
- emitted_at
properties:
type:
title: orchestrator type
description: "the type of orchestrator message"
type: string
enum:
- CONNECTOR_CONFIG
emitted_at:
description: "the time in ms that the message was emitted"
type: number
connectorConfig:
description: "connector config orchestrator message: the updated config for the platform to store for this connector"
"$ref": "#/definitions/AirbyteControlConnectorConfigMessage"
```

### AirbyteControlConnectorConfigMessage

`AirbyteControlConnectorConfigMessage` allows a connector to update its configuration in the middle of a sync. This is valuable for connectors with short-lived or single-use credentials.

Emitting this message signals to the orchestrator process that it should update its persistence layer, replacing the connector's current configuration with the config present in the `.config` field of the message.

The config in the `AirbyteControlConnectorConfigMessage` must conform to connector's specification's schema, and the orchestrator process is expected to validate these messages. If the output config does not conform to the specification's schema, the orchestrator process should raise an exception and terminate the sync.

```yaml
AirbyteControlConnectorConfigMessage:
type: object
additionalProperties: true
required:
- config
properties:
config:
description: "the config items from this connector's spec to update"
type: object
additionalProperties: true
```

For example, if the currently persisted config file is `{"api_key": 123, start_date: "01-01-2022"}` and the following `AirbyteControlConnectorConfigMessage` is output `{type: ORCHESTRATOR, connectorConfig: {"config": {"api_key": 456}, "emitted_at": <current_time>}}` then the persisted configuration is merged, and will become `{"api_key": 456, start_date: "01-01-2022"}`.

# Acknowledgements

We'd like to note that we were initially inspired by Singer.io's [specification](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#singer-specification) and would like to acknowledge that some of their design choices helped us bootstrap our project. We've since made a lot of modernizations to our protocol and specification, but don't want to forget the tools that helped us get started.

0 comments on commit 0d93cd1

Please sign in to comment.