Skip to content

Commit

Permalink
include webhook configs in operations db persistence layer (airbytehq…
Browse files Browse the repository at this point in the history
…#18030)

* include webhook configs in operations db persistence layer

* add unit tests for operations persistence
  • Loading branch information
mfsiega-airbyte authored and jhammarstedt committed Oct 31, 2022
1 parent 3bc6ca9 commit 398a415
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.OperatorNormalization;
import io.airbyte.config.OperatorWebhook;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.StandardDestinationDefinition;
Expand Down Expand Up @@ -580,6 +581,7 @@ private StandardSyncOperation buildStandardSyncOperation(final Record record) {
.withOperatorType(Enums.toEnum(record.get(OPERATION.OPERATOR_TYPE, String.class), OperatorType.class).orElseThrow())
.withOperatorNormalization(Jsons.deserialize(record.get(OPERATION.OPERATOR_NORMALIZATION).data(), OperatorNormalization.class))
.withOperatorDbt(Jsons.deserialize(record.get(OPERATION.OPERATOR_DBT).data(), OperatorDbt.class))
.withOperatorWebhook(Jsons.deserialize(record.get(OPERATION.OPERATOR_WEBHOOK).data(), OperatorWebhook.class))
.withTombstone(record.get(OPERATION.TOMBSTONE));
}

Expand Down Expand Up @@ -1033,6 +1035,7 @@ private void writeStandardSyncOperation(final List<StandardSyncOperation> config
io.airbyte.db.instance.configs.jooq.generated.enums.OperatorType.class).orElseThrow())
.set(OPERATION.OPERATOR_NORMALIZATION, JSONB.valueOf(Jsons.serialize(standardSyncOperation.getOperatorNormalization())))
.set(OPERATION.OPERATOR_DBT, JSONB.valueOf(Jsons.serialize(standardSyncOperation.getOperatorDbt())))
.set(OPERATION.OPERATOR_WEBHOOK, JSONB.valueOf(Jsons.serialize(standardSyncOperation.getOperatorWebhook())))
.set(OPERATION.TOMBSTONE, standardSyncOperation.getTombstone() != null && standardSyncOperation.getTombstone())
.set(OPERATION.UPDATED_AT, timestamp)
.where(OPERATION.ID.eq(standardSyncOperation.getOperationId()))
Expand All @@ -1047,6 +1050,7 @@ private void writeStandardSyncOperation(final List<StandardSyncOperation> config
io.airbyte.db.instance.configs.jooq.generated.enums.OperatorType.class).orElseThrow())
.set(OPERATION.OPERATOR_NORMALIZATION, JSONB.valueOf(Jsons.serialize(standardSyncOperation.getOperatorNormalization())))
.set(OPERATION.OPERATOR_DBT, JSONB.valueOf(Jsons.serialize(standardSyncOperation.getOperatorDbt())))
.set(OPERATION.OPERATOR_WEBHOOK, JSONB.valueOf(Jsons.serialize(standardSyncOperation.getOperatorWebhook())))
.set(OPERATION.TOMBSTONE, standardSyncOperation.getTombstone() != null && standardSyncOperation.getTombstone())
.set(OPERATION.CREATED_AT, timestamp)
.set(OPERATION.UPDATED_AT, timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.OperatorNormalization;
import io.airbyte.config.OperatorNormalization.Option;
import io.airbyte.config.OperatorWebhook;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.Schedule;
import io.airbyte.config.Schedule.TimeUnit;
Expand Down Expand Up @@ -128,6 +129,10 @@ public class MockData {
private static final Instant NOW = Instant.parse("2021-12-15T20:30:40.00Z");

private static final String CONNECTION_SPECIFICATION = "'{\"name\":\"John\", \"age\":30, \"car\":null}'";
private static final UUID OPERATION_ID_4 = UUID.randomUUID();
private static final UUID WEBHOOK_CONFIG_ID = UUID.randomUUID();
private static final String WEBHOOK_OPERATION_EXECUTION_URL = "test-webhook-url";
private static final String WEBHOOK_OPERATION_EXECUTION_BODY = "test-webhook-body";

public static List<StandardWorkspace> standardWorkspaces() {
final Notification notification = new Notification()
Expand Down Expand Up @@ -429,7 +434,20 @@ public static List<StandardSyncOperation> standardSyncOperations() {
.withOperatorDbt(null)
.withOperatorNormalization(new OperatorNormalization().withOption(Option.BASIC))
.withOperatorType(OperatorType.NORMALIZATION);
return Arrays.asList(standardSyncOperation1, standardSyncOperation2, standardSyncOperation3);
final StandardSyncOperation standardSyncOperation4 = new StandardSyncOperation()
.withName("webhook-operation")
.withTombstone(false)
.withOperationId(OPERATION_ID_4)
.withWorkspaceId(WORKSPACE_ID_1)
.withOperatorType(OperatorType.WEBHOOK)
.withOperatorDbt(null)
.withOperatorNormalization(null)
.withOperatorWebhook(
new OperatorWebhook()
.withWebhookConfigId(WEBHOOK_CONFIG_ID)
.withExecutionUrl(WEBHOOK_OPERATION_EXECUTION_URL)
.withExecutionBody(WEBHOOK_OPERATION_EXECUTION_BODY));
return Arrays.asList(standardSyncOperation1, standardSyncOperation2, standardSyncOperation3, standardSyncOperation4);
}

public static List<StandardSync> standardSyncs() {
Expand Down

0 comments on commit 398a415

Please sign in to comment.