Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement webhook operation in the sync workflow #18022

Merged
merged 24 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d6758a0
Implement and hook up webhook operation activity in sync workflow
mfsiega-airbyte Oct 14, 2022
bfa9f68
include new files in webhook operation activity implementation
mfsiega-airbyte Oct 14, 2022
bd7082d
Merge branch 'master' into msiega/dbt-cloud-impl
mfsiega-airbyte Oct 15, 2022
79c263d
include webhook configs in operations db persistence layer
mfsiega-airbyte Oct 15, 2022
23c8a9a
Merge branch 'msiega/dbt-cloud-persist-operations' into msiega/dbt-cl…
mfsiega-airbyte Oct 15, 2022
395a7fa
ensure workspace webhook configs can be correctly passed between API …
mfsiega-airbyte Oct 16, 2022
3c37369
remove unnecessary logging
mfsiega-airbyte Oct 16, 2022
c4cdaa3
add unit tests to workspace webhook config handling
mfsiega-airbyte Oct 16, 2022
fecd138
Merge branch 'msiega/dbt-cloud-webhook-config-persistence' into msieg…
mfsiega-airbyte Oct 16, 2022
52828af
ensure that the webhook operation activity is successfully hooked int…
mfsiega-airbyte Oct 16, 2022
0760fcc
additional testing and style cleanup around workspace webhook config …
mfsiega-airbyte Oct 16, 2022
d244cfc
Merge branch 'msiega/dbt-cloud-webhook-config-persistence' into msieg…
mfsiega-airbyte Oct 16, 2022
f09bca4
fix syncworkflowimpl formatting
mfsiega-airbyte Oct 16, 2022
61fa3ec
add unit testing around webhook operation temporal workflow activity
mfsiega-airbyte Oct 17, 2022
31dd764
Merge branch 'master' into msiega/dbt-cloud-impl
mfsiega-airbyte Oct 17, 2022
92503eb
Update airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync…
mfsiega-airbyte Oct 17, 2022
7788398
Update airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync…
mfsiega-airbyte Oct 17, 2022
a14521f
populate sync output with info about webhook execution success/failures
mfsiega-airbyte Oct 17, 2022
2b9d1eb
introduce an e2e test that includes configuring and executing a webho…
mfsiega-airbyte Oct 17, 2022
8b6dd87
Merge branch 'master' into msiega/dbt-cloud-impl
mfsiega-airbyte Oct 17, 2022
999b60a
Merge branch 'master' into msiega/dbt-cloud-impl
mfsiega-airbyte Oct 17, 2022
3902b04
Merge branch 'master' into msiega/dbt-cloud-impl
mfsiega-airbyte Oct 17, 2022
e0bbefc
documentation and clarity improvements around webhook operations
mfsiega-airbyte Oct 17, 2022
738e91e
logging improvements around webhook operations
mfsiega-airbyte Oct 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ properties:
type: array
items:
"$ref": StandardSyncOperation.yaml
webhookOperationConfigs:
description: The webhook operation configs belonging to this workspace. Must conform to WebhookOperationConfigs.yaml.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
resourceRequirements:
type: object
description: optional resource requirements to run sync workers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ properties:
type: array
items:
"$ref": StandardSyncOperation.yaml
webhookOperationConfigs:
description: The webhook operation configs belonging to this workspace. Must conform to WebhookOperationConfigs.yaml.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
state:
description: optional state of the previous run. this object is defined per integration.
"$ref": State.yaml
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/OperatorWebhookInput.yaml
title: OperatorWebhookInput
description: Execution input for a webhook operation
type: object
required:
- executionUrl
- webhookConfigId
properties:
executionUrl:
description: URL to invoke the webhook via POST.
type: string
executionBody:
description: Message body to be POSTed.
type: string
webhookConfigId:
description: An id used to index into the workspaceWebhookConfigs, which has a list of webhook configs.
type: string
format: uuid
mfsiega-airbyte marked this conversation as resolved.
Show resolved Hide resolved
workspaceWebhookConfigs:
description: Webhook configs for this workspace. Must conform to WebhookOperationConfigs.yaml; any secrets must be hydrated before use.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ properties:
type: array
items:
"$ref": StandardSyncOperation.yaml
webhookOperationConfigs:
description: The webhook operation configs belonging to this workspace. See webhookOperationConfigs in StandardWorkspace.yaml.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
mfsiega-airbyte marked this conversation as resolved.
Show resolved Hide resolved
catalog:
description: the configured airbyte catalog
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ properties:
"$ref": StandardSyncSummary.yaml
normalizationSummary:
"$ref": NormalizationSummary.yaml
webhookOperationSummary:
"$ref": WebhookOperationSummary.yaml
state:
"$ref": State.yaml
output_catalog:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ properties:
required:
- id
- name
mfsiega-airbyte marked this conversation as resolved.
Show resolved Hide resolved
- authToken
properties:
id:
type: string
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/WebhookOperationSummary.yaml
title: WebhookOperationSummary
description: information output by syncs for which at least one webhook invocation step was performed
type: object
required:
- startTime
- endTime
additionalProperties: false
properties:
successes:
type: array
description: List of webhook config ids that were successfully executed.
items:
type: string
format: uuid
failures:
type: array
# TODO(mfsiega-airbyte): include failure reason once possible.
description: List of webhook config ids that failed.
items:
type: string
format: uuid
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.persistence.job;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobConfig;
Expand Down Expand Up @@ -53,6 +54,7 @@ public Optional<Long> createSyncJob(final SourceConnection source,
final String sourceDockerImageName,
final String destinationDockerImageName,
final List<StandardSyncOperation> standardSyncOperations,
@Nullable final JsonNode webhookOperationConfigs,
@Nullable final ActorDefinitionResourceRequirements sourceResourceReqs,
@Nullable final ActorDefinitionResourceRequirements destinationResourceReqs)
throws IOException {
Expand Down Expand Up @@ -81,6 +83,7 @@ public Optional<Long> createSyncJob(final SourceConnection source,
.withDestinationDockerImage(destinationDockerImageName)
.withDestinationConfiguration(destination.getConfiguration())
.withOperationSequence(standardSyncOperations)
.withWebhookOperationConfigs(webhookOperationConfigs)
.withConfiguredAirbyteCatalog(standardSync.getCatalog())
.withState(null)
.withResourceRequirements(mergedOrchestratorResourceReq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.persistence.job;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
Expand Down Expand Up @@ -33,6 +34,7 @@ Optional<Long> createSyncJob(SourceConnection source,
String sourceDockerImage,
String destinationDockerImage,
List<StandardSyncOperation> standardSyncOperations,
@Nullable JsonNode webhookOperationConfigs,
@Nullable ActorDefinitionResourceRequirements sourceResourceReqs,
@Nullable ActorDefinitionResourceRequirements destinationResourceReqs)
throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.DefaultJobCreator;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
Expand All @@ -28,21 +30,26 @@ public class DefaultSyncJobFactory implements SyncJobFactory {
private final DefaultJobCreator jobCreator;
private final ConfigRepository configRepository;
private final OAuthConfigSupplier oAuthConfigSupplier;
private final WorkspaceHelper workspaceHelper;

public DefaultSyncJobFactory(final boolean connectorSpecificResourceDefaultsEnabled,
final DefaultJobCreator jobCreator,
final ConfigRepository configRepository,
final OAuthConfigSupplier oAuthConfigSupplier) {
final OAuthConfigSupplier oAuthConfigSupplier,
final WorkspaceHelper workspaceHelper) {
this.connectorSpecificResourceDefaultsEnabled = connectorSpecificResourceDefaultsEnabled;
this.jobCreator = jobCreator;
this.configRepository = configRepository;
this.oAuthConfigSupplier = oAuthConfigSupplier;
this.workspaceHelper = workspaceHelper;
}

@Override
public Long create(final UUID connectionId) {
try {
final StandardSync standardSync = configRepository.getStandardSync(connectionId);
final UUID workspaceId = workspaceHelper.getWorkspaceForSourceId(standardSync.getSourceId());
final StandardWorkspace workspace = configRepository.getStandardWorkspaceNoSecrets(workspaceId, true);
final SourceConnection sourceConnection = configRepository.getSourceConnection(standardSync.getSourceId());
final DestinationConnection destinationConnection = configRepository.getDestinationConnection(standardSync.getDestinationId());
final JsonNode sourceConfiguration = oAuthConfigSupplier.injectSourceOAuthParameters(
Expand Down Expand Up @@ -86,6 +93,7 @@ public Long create(final UUID connectionId) {
sourceImageName,
destinationImageName,
standardSyncOperations,
workspace.getWebhookOperationConfigs(),
sourceResourceRequirements,
destinationResourceRequirements)
.orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,17 @@ class DefaultJobCreatorTest {
private JobCreator jobCreator;
private ResourceRequirements workerResourceRequirements;

private static final JsonNode PERSISTED_WEBHOOK_CONFIGS;

private static final UUID WEBHOOK_CONFIG_ID;
private static final String WEBHOOK_NAME;

static {
final UUID workspaceId = UUID.randomUUID();
final UUID sourceId = UUID.randomUUID();
final UUID sourceDefinitionId = UUID.randomUUID();
WEBHOOK_CONFIG_ID = UUID.randomUUID();
WEBHOOK_NAME = "test-name";

final JsonNode implementationJson = Jsons.jsonNode(ImmutableMap.builder()
.put("apiKey", "123-abc")
Expand Down Expand Up @@ -129,6 +136,10 @@ class DefaultJobCreatorTest {
.withTombstone(false)
.withOperatorType(OperatorType.NORMALIZATION)
.withOperatorNormalization(new OperatorNormalization().withOption(Option.BASIC));

PERSISTED_WEBHOOK_CONFIGS = Jsons.deserialize(
String.format("{\"webhookConfigs\": [{\"id\": \"%s\", \"name\": \"%s\", \"authToken\": {\"_secret\": \"a-secret_v1\"}}]}",
WEBHOOK_CONFIG_ID, WEBHOOK_NAME));
}

@BeforeEach
Expand Down Expand Up @@ -157,7 +168,8 @@ void testCreateSyncJob() throws IOException {
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements)
.withSourceResourceRequirements(workerResourceRequirements)
.withDestinationResourceRequirements(workerResourceRequirements);
.withDestinationResourceRequirements(workerResourceRequirements)
.withWebhookOperationConfigs(PERSISTED_WEBHOOK_CONFIGS);

final JobConfig jobConfig = new JobConfig()
.withConfigType(JobConfig.ConfigType.SYNC)
Expand All @@ -173,6 +185,7 @@ void testCreateSyncJob() throws IOException {
SOURCE_IMAGE_NAME,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
PERSISTED_WEBHOOK_CONFIGS,
null,
null).orElseThrow();
assertEquals(JOB_ID, jobId);
Expand Down Expand Up @@ -207,6 +220,7 @@ void testCreateSyncJobEnsureNoQueuing() throws IOException {
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
null,
null,
null).isEmpty());
}

Expand All @@ -220,6 +234,7 @@ void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException {
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
null,
null,
null);

final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig()
Expand Down Expand Up @@ -262,6 +277,7 @@ void testCreateSyncJobConnectionResourceReqs() throws IOException {
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
null,
null,
null);

final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig()
Expand Down Expand Up @@ -307,6 +323,7 @@ void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException {
SOURCE_IMAGE_NAME,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
null,
new ActorDefinitionResourceRequirements().withDefault(sourceResourceRequirements),
new ActorDefinitionResourceRequirements().withJobSpecific(List.of(
new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@
package io.airbyte.persistence.job.factory;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.DefaultJobCreator;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.List;
Expand All @@ -36,8 +42,14 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
final UUID sourceId = UUID.randomUUID();
final UUID destinationId = UUID.randomUUID();
final UUID operationId = UUID.randomUUID();
final UUID workspaceWebhookConfigId = UUID.randomUUID();
final String workspaceWebhookName = "test-webhook-name";
final JsonNode persistedWebhookConfigs = Jsons.deserialize(
String.format("{\"webhookConfigs\": [{\"id\": \"%s\", \"name\": \"%s\", \"authToken\": {\"_secret\": \"a-secret_v1\"}}]}",
workspaceWebhookConfigId, workspaceWebhookName));
final DefaultJobCreator jobCreator = mock(DefaultJobCreator.class);
final ConfigRepository configRepository = mock(ConfigRepository.class);
final WorkspaceHelper workspaceHelper = mock(WorkspaceHelper.class);
final long jobId = 11L;

final StandardSyncOperation operation = new StandardSyncOperation().withOperationId(operationId);
Expand All @@ -62,8 +74,10 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
when(configRepository.getSourceConnection(sourceId)).thenReturn(sourceConnection);
when(configRepository.getDestinationConnection(destinationId)).thenReturn(destinationConnection);
when(configRepository.getStandardSyncOperation(operationId)).thenReturn(operation);
when(jobCreator.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, dstDockerImage, operations, null, null))
.thenReturn(Optional.of(jobId));
when(
jobCreator.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, dstDockerImage, operations,
persistedWebhookConfigs, null, null))
.thenReturn(Optional.of(jobId));
when(configRepository.getStandardSourceDefinition(sourceDefinitionId))
.thenReturn(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinitionId).withDockerRepository(srcDockerRepo)
.withDockerImageTag(srcDockerTag));
Expand All @@ -72,12 +86,16 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
.thenReturn(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinitionId).withDockerRepository(dstDockerRepo)
.withDockerImageTag(dstDockerTag));

final SyncJobFactory factory = new DefaultSyncJobFactory(true, jobCreator, configRepository, mock(OAuthConfigSupplier.class));
when(configRepository.getStandardWorkspaceNoSecrets(any(), eq(true))).thenReturn(
new StandardWorkspace().withWebhookOperationConfigs(persistedWebhookConfigs));

final SyncJobFactory factory = new DefaultSyncJobFactory(true, jobCreator, configRepository, mock(OAuthConfigSupplier.class), workspaceHelper);
final long actualJobId = factory.create(connectionId);
assertEquals(jobId, actualJobId);

verify(jobCreator)
.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, dstDockerImage, operations, null, null);
.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, dstDockerImage, operations, persistedWebhookConfigs,
null, null);
}

}
Loading