Skip to content

Commit

Permalink
Implement webhook operation in the sync workflow (airbytehq#18022)
Browse files Browse the repository at this point in the history
Implements the webhook operation as part of the sync workflow.

- Introduces the new activity implementation
- Updates the various interfaces that pass input to get the relevant configs to the sync workflow
- Hooks the new activity into the sync workflow
- Passes the webhook configs along into the sync workflow job
  • Loading branch information
mfsiega-airbyte authored and jhammarstedt committed Oct 31, 2022
1 parent 48b3265 commit 703d72d
Show file tree
Hide file tree
Showing 22 changed files with 418 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ properties:
type: array
items:
"$ref": StandardSyncOperation.yaml
webhookOperationConfigs:
description: The webhook operation configs belonging to this workspace. Must conform to WebhookOperationConfigs.yaml.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
resourceRequirements:
type: object
description: optional resource requirements to run sync workers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ properties:
type: array
items:
"$ref": StandardSyncOperation.yaml
webhookOperationConfigs:
description: The webhook operation configs belonging to this workspace. Must conform to WebhookOperationConfigs.yaml.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
state:
description: optional state of the previous run. this object is defined per integration.
"$ref": State.yaml
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/OperatorWebhookInput.yaml
title: OperatorWebhookInput
description: Execution input for a webhook operation
type: object
required:
- executionUrl
- webhookConfigId
properties:
executionUrl:
description: URL to invoke the webhook via POST.
type: string
executionBody:
description: Message body to be POSTed.
type: string
webhookConfigId:
description: An id used to index into the workspaceWebhookConfigs, which has a list of webhook configs.
type: string
format: uuid
workspaceWebhookConfigs:
description: Webhook configs for this workspace. Must conform to WebhookOperationConfigs.yaml; any secrets must be hydrated before use.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ properties:
type: array
items:
"$ref": StandardSyncOperation.yaml
webhookOperationConfigs:
description: The webhook operation configs belonging to this workspace. See webhookOperationConfigs in StandardWorkspace.yaml.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
catalog:
description: the configured airbyte catalog
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ properties:
"$ref": StandardSyncSummary.yaml
normalizationSummary:
"$ref": NormalizationSummary.yaml
webhookOperationSummary:
"$ref": WebhookOperationSummary.yaml
state:
"$ref": State.yaml
output_catalog:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ properties:
required:
- id
- name
- authToken
properties:
id:
type: string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/WebhookOperationSummary.yaml
title: WebhookOperationSummary
description: information output by syncs for which at least one webhook invocation step was performed
type: object
required:
- startTime
- endTime
additionalProperties: false
properties:
successes:
type: array
description: List of webhook config ids that were successfully executed.
items:
type: string
format: uuid
failures:
type: array
# TODO(mfsiega-airbyte): include failure reason once possible.
description: List of webhook config ids that failed.
items:
type: string
format: uuid
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.persistence.job;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobConfig;
Expand Down Expand Up @@ -53,6 +54,7 @@ public Optional<Long> createSyncJob(final SourceConnection source,
final String sourceDockerImageName,
final String destinationDockerImageName,
final List<StandardSyncOperation> standardSyncOperations,
@Nullable final JsonNode webhookOperationConfigs,
@Nullable final ActorDefinitionResourceRequirements sourceResourceReqs,
@Nullable final ActorDefinitionResourceRequirements destinationResourceReqs)
throws IOException {
Expand Down Expand Up @@ -81,6 +83,7 @@ public Optional<Long> createSyncJob(final SourceConnection source,
.withDestinationDockerImage(destinationDockerImageName)
.withDestinationConfiguration(destination.getConfiguration())
.withOperationSequence(standardSyncOperations)
.withWebhookOperationConfigs(webhookOperationConfigs)
.withConfiguredAirbyteCatalog(standardSync.getCatalog())
.withState(null)
.withResourceRequirements(mergedOrchestratorResourceReq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.persistence.job;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
Expand Down Expand Up @@ -33,6 +34,7 @@ Optional<Long> createSyncJob(SourceConnection source,
String sourceDockerImage,
String destinationDockerImage,
List<StandardSyncOperation> standardSyncOperations,
@Nullable JsonNode webhookOperationConfigs,
@Nullable ActorDefinitionResourceRequirements sourceResourceReqs,
@Nullable ActorDefinitionResourceRequirements destinationResourceReqs)
throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.DefaultJobCreator;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
Expand All @@ -28,21 +30,26 @@ public class DefaultSyncJobFactory implements SyncJobFactory {
private final DefaultJobCreator jobCreator;
private final ConfigRepository configRepository;
private final OAuthConfigSupplier oAuthConfigSupplier;
private final WorkspaceHelper workspaceHelper;

public DefaultSyncJobFactory(final boolean connectorSpecificResourceDefaultsEnabled,
final DefaultJobCreator jobCreator,
final ConfigRepository configRepository,
final OAuthConfigSupplier oAuthConfigSupplier) {
final OAuthConfigSupplier oAuthConfigSupplier,
final WorkspaceHelper workspaceHelper) {
this.connectorSpecificResourceDefaultsEnabled = connectorSpecificResourceDefaultsEnabled;
this.jobCreator = jobCreator;
this.configRepository = configRepository;
this.oAuthConfigSupplier = oAuthConfigSupplier;
this.workspaceHelper = workspaceHelper;
}

@Override
public Long create(final UUID connectionId) {
try {
final StandardSync standardSync = configRepository.getStandardSync(connectionId);
final UUID workspaceId = workspaceHelper.getWorkspaceForSourceId(standardSync.getSourceId());
final StandardWorkspace workspace = configRepository.getStandardWorkspaceNoSecrets(workspaceId, true);
final SourceConnection sourceConnection = configRepository.getSourceConnection(standardSync.getSourceId());
final DestinationConnection destinationConnection = configRepository.getDestinationConnection(standardSync.getDestinationId());
final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters(
Expand Down Expand Up @@ -86,6 +93,7 @@ public Long create(final UUID connectionId) {
sourceImageName,
destinationImageName,
standardSyncOperations,
workspace.getWebhookOperationConfigs(),
sourceResourceRequirements,
destinationResourceRequirements)
.orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,17 @@ class DefaultJobCreatorTest {
private JobCreator jobCreator;
private ResourceRequirements workerResourceRequirements;

private static final JsonNode PERSISTED_WEBHOOK_CONFIGS;

private static final UUID WEBHOOK_CONFIG_ID;
private static final String WEBHOOK_NAME;

static {
final UUID workspaceId = UUID.randomUUID();
final UUID sourceId = UUID.randomUUID();
final UUID sourceDefinitionId = UUID.randomUUID();
WEBHOOK_CONFIG_ID = UUID.randomUUID();
WEBHOOK_NAME = "test-name";

final JsonNode implementationJson = Jsons.jsonNode(ImmutableMap.builder()
.put("apiKey", "123-abc")
Expand Down Expand Up @@ -129,6 +136,10 @@ class DefaultJobCreatorTest {
.withTombstone(false)
.withOperatorType(OperatorType.NORMALIZATION)
.withOperatorNormalization(new OperatorNormalization().withOption(Option.BASIC));

PERSISTED_WEBHOOK_CONFIGS = Jsons.deserialize(
String.format("{\"webhookConfigs\": [{\"id\": \"%s\", \"name\": \"%s\", \"authToken\": {\"_secret\": \"a-secret_v1\"}}]}",
WEBHOOK_CONFIG_ID, WEBHOOK_NAME));
}

@BeforeEach
Expand Down Expand Up @@ -157,7 +168,8 @@ void testCreateSyncJob() throws IOException {
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements)
.withSourceResourceRequirements(workerResourceRequirements)
.withDestinationResourceRequirements(workerResourceRequirements);
.withDestinationResourceRequirements(workerResourceRequirements)
.withWebhookOperationConfigs(PERSISTED_WEBHOOK_CONFIGS);

final JobConfig jobConfig = new JobConfig()
.withConfigType(JobConfig.ConfigType.SYNC)
Expand All @@ -173,6 +185,7 @@ void testCreateSyncJob() throws IOException {
SOURCE_IMAGE_NAME,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
PERSISTED_WEBHOOK_CONFIGS,
null,
null).orElseThrow();
assertEquals(JOB_ID, jobId);
Expand Down Expand Up @@ -207,6 +220,7 @@ void testCreateSyncJobEnsureNoQueuing() throws IOException {
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
null,
null,
null).isEmpty());
}

Expand All @@ -220,6 +234,7 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException {
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
null,
null,
null);

final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig()
Expand Down Expand Up @@ -262,6 +277,7 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException {
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
null,
null,
null);

final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig()
Expand Down Expand Up @@ -307,6 +323,7 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException {
SOURCE_IMAGE_NAME,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
null,
new ActorDefinitionResourceRequirements().withDefault(sourceResourceRequirements),
new ActorDefinitionResourceRequirements().withJobSpecific(List.of(
new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@
package io.airbyte.persistence.job.factory;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.DefaultJobCreator;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
Expand All @@ -36,8 +42,14 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
final UUID sourceId = UUID.randomUUID();
final UUID destinationId = UUID.randomUUID();
final UUID operationId = UUID.randomUUID();
final UUID workspaceWebhookConfigId = UUID.randomUUID();
final String workspaceWebhookName = "test-webhook-name";
final JsonNode persistedWebhookConfigs = Jsons.deserialize(
String.format("{\"webhookConfigs\": [{\"id\": \"%s\", \"name\": \"%s\", \"authToken\": {\"_secret\": \"a-secret_v1\"}}]}",
workspaceWebhookConfigId, workspaceWebhookName));
final DefaultJobCreator jobCreator = mock(DefaultJobCreator.class);
final ConfigRepository configRepository = mock(ConfigRepository.class);
final WorkspaceHelper workspaceHelper = mock(WorkspaceHelper.class);
final long jobId = 11L;

final StandardSyncOperation operation = new StandardSyncOperation().withOperationId(operationId);
Expand All @@ -62,8 +74,10 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
when(configRepository.getSourceConnection(sourceId)).thenReturn(sourceConnection);
when(configRepository.getDestinationConnection(destinationId)).thenReturn(destinationConnection);
when(configRepository.getStandardSyncOperation(operationId)).thenReturn(operation);
when(jobCreator.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, dstDockerImage, operations, null, null))
.thenReturn(Optional.of(jobId));
when(
jobCreator.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, dstDockerImage, operations,
persistedWebhookConfigs, null, null))
.thenReturn(Optional.of(jobId));
when(configRepository.getStandardSourceDefinition(sourceDefinitionId))
.thenReturn(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinitionId).withDockerRepository(srcDockerRepo)
.withDockerImageTag(srcDockerTag));
Expand All @@ -72,12 +86,16 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
.thenReturn(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinitionId).withDockerRepository(dstDockerRepo)
.withDockerImageTag(dstDockerTag));

final SyncJobFactory factory = new DefaultSyncJobFactory(true, jobCreator, configRepository, mock(OAuthConfigSupplier.class));
when(configRepository.getStandardWorkspaceNoSecrets(any(), eq(true))).thenReturn(
new StandardWorkspace().withWebhookOperationConfigs(persistedWebhookConfigs));

final SyncJobFactory factory = new DefaultSyncJobFactory(true, jobCreator, configRepository, mock(OAuthConfigSupplier.class), workspaceHelper);
final long actualJobId = factory.create(connectionId);
assertEquals(jobId, actualJobId);

verify(jobCreator)
.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, dstDockerImage, operations, null, null);
.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, dstDockerImage, operations, persistedWebhookConfigs,
null, null);
}

}
Loading

0 comments on commit 703d72d

Please sign in to comment.