Skip to content

Commit

Permalink
Add routing logic for check and discover workflow (#21822)
Browse files Browse the repository at this point in the history
* api changes for writing discover catalog

* api changes

* format

* worker change 1

* change return type of the API to return catalogId

* worker to call api

* typo

* 🎉 Source GoogleSheets -  migrated SAT to strictness level (#21399)

* migrated SAT to strictness level

* fixed expected records

* revert file from another source

* changed extension to txt

* changed extension to txt

* 🐛Destination-Bigquery: Added an explicit error message if sync fails due to a config issue (#21144)

* [19998] Destination-Bigquery: Added an explicit error message in sync fails due to a config issue

* ci-connector-ops: split workflows(#21474)

* CI:  nightly build alpha sources and destinations (#21562)

* Revert "Change main class in strict-encrypt destination and bump versions on both destinations to keep them in sync (#21509)" (#21567)

This reverts commit 1d202d1.

* Fixes webhook updating logic (#21519)

* ci_credentials: disable tooling test run by tox (#21580)

* disable tox

* rename steps

* revert changes on experimental workflow

* do not install tox

* Revert "CI:  nightly build alpha sources and destinations (#21562)" (#21589)

This reverts commit 61f88f3.

* Security update of default docker images (#21407)

Because there is a lot of CVEs in those releases.

Co-authored-by: Topher Lubaway <asimplechris@gmail.com>

* 📝 add docs for how to add normalization (#21563)

* add docs

* add schema link

* update based on feedback

* 🪟 🚦  E2E tests: clean up matchers (#20887)

* improve serviceTypeDropdownOption selector

* add test ids to PathPopout component(s)

* add unique id's to table dropdowns

* extend submitButtonClick to support optional click options

* update dropdown(pathPopout) matchers

* add test-id to Overlay component

* remove redundant function brackets

* revert changes onSubmit button click

* fix dropDown overlay issue

* move all duplicated intercepters to beforeEach

* add test id's to Connections, Sources and Destinations tables

* add table helper functions

* update source page actions

* intercepter fixes

* update createTestConnection function with optional replication settings

* remove extra Connection name check

* replace "cypress-postgres" with "pg-promise" npm package

* update cypress config

* Revert "update createTestConnection function with optional replication settings"

This reverts commit 8e47c78.

* Revert "remove extra Connection name check"

This reverts commit dfb19c7.

* replace openSourceDestinationFromGrid with specific selector

* replace openSourceDestinationFromGrid with specific selector

* turn on test

* add test-id's

* fix selectors

* update test

* update test snapshots

* fix lost data-testid after resolve merge conflicts

* remove extra check

* move clickOnCellInTable helper to common.ts file

* remove empty line and comments

* fix dropdownType

* replace partial string check with exact

* extract interceptors and waiters to separate file

* fix selector for predefined PK

* fix selector

* add comment regarding dropdown

* 🪟 🎨 [Free connectors] Update modal copy (#21600)

* move start/end time options out of optional block (#21541)

* lingering fix

* reflecting api changes

* test fix

* reset to master

* routing changes

* remove unexpected merge

* resolve dependency micronaut

* resolve dependency

* format

* fix test

* rename and refactor location of test files

* test

* redo the routing service

* add test to config repository

* query workspace not connection for discover/check

* remove unused bean

---------

Co-authored-by: midavadim <midavadim@yahoo.com>
Co-authored-by: Eugene <etsybaev@gmail.com>
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Greg Solovyev <grishick@users.noreply.github.com>
Co-authored-by: Yatsuk Bogdan <yatsukbogdan@gmail.com>
Co-authored-by: Hervé Commowick <github@herve.commowick.fr>
Co-authored-by: Topher Lubaway <asimplechris@gmail.com>
Co-authored-by: Pedro S. Lopez <pedroslopez@me.com>
Co-authored-by: Vladimir <volodymyr.s.petrov@globallogic.com>
Co-authored-by: Joey Marshment-Howell <josephkmh@users.noreply.github.com>
Co-authored-by: Lake Mossman <lake@airbyte.io>
  • Loading branch information
12 people authored Feb 7, 2023
1 parent a8ed833 commit 48a5554
Show file tree
Hide file tree
Showing 19 changed files with 245 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.temporal.TemporalClient;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.commons.temporal.TemporalResponse;
import io.airbyte.commons.temporal.scheduling.RouterService;
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorType;
import io.airbyte.config.ConnectorJobOutput;
Expand Down Expand Up @@ -50,14 +52,18 @@ public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerCl
private final JobErrorReporter jobErrorReporter;
private final OAuthConfigSupplier oAuthConfigSupplier;

private final RouterService routerService;

public DefaultSynchronousSchedulerClient(final TemporalClient temporalClient,
final JobTracker jobTracker,
final JobErrorReporter jobErrorReporter,
final OAuthConfigSupplier oAuthConfigSupplier) {
final OAuthConfigSupplier oAuthConfigSupplier,
final RouterService routerService) {
this.temporalClient = temporalClient;
this.jobTracker = jobTracker;
this.jobErrorReporter = jobErrorReporter;
this.oAuthConfigSupplier = oAuthConfigSupplier;
this.routerService = routerService;
}

@Override
Expand All @@ -80,12 +86,13 @@ public SynchronousResponse<StandardCheckConnectionOutput> createSourceCheckConne

final UUID jobId = UUID.randomUUID();
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);
String taskQueue = routerService.getTaskQueueForWorkspace(source.getWorkspaceId(), TemporalJobType.CHECK_CONNECTION);

return execute(
ConfigType.CHECK_CONNECTION_SOURCE,
jobReportingContext,
source.getSourceDefinitionId(),
() -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig),
() -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, taskQueue, jobCheckConnectionConfig),
ConnectorJobOutput::getCheckConnection,
source.getWorkspaceId());
}
Expand All @@ -110,12 +117,13 @@ public SynchronousResponse<StandardCheckConnectionOutput> createDestinationCheck

final UUID jobId = UUID.randomUUID();
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);
String taskQueue = routerService.getTaskQueueForWorkspace(destination.getWorkspaceId(), TemporalJobType.CHECK_CONNECTION);

return execute(
ConfigType.CHECK_CONNECTION_DESTINATION,
jobReportingContext,
destination.getDestinationDefinitionId(),
() -> temporalClient.submitCheckConnection(jobId, 0, jobCheckConnectionConfig),
() -> temporalClient.submitCheckConnection(jobId, 0, taskQueue, jobCheckConnectionConfig),
ConnectorJobOutput::getCheckConnection,
destination.getWorkspaceId());
}
Expand Down Expand Up @@ -144,11 +152,13 @@ public SynchronousResponse<UUID> createDiscoverSchemaJob(final SourceConnection
final UUID jobId = UUID.randomUUID();
final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage);

String taskQueue = routerService.getTaskQueueForWorkspace(source.getWorkspaceId(), TemporalJobType.DISCOVER_SCHEMA);

return execute(
ConfigType.DISCOVER_SCHEMA,
jobReportingContext,
source.getSourceDefinitionId(),
() -> temporalClient.submitDiscoverSchema(jobId, 0, jobDiscoverCatalogConfig),
() -> temporalClient.submitDiscoverSchema(jobId, 0, taskQueue, jobDiscoverCatalogConfig),
ConnectorJobOutput::getDiscoverCatalogId,
source.getWorkspaceId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.JobMetadata;
import io.airbyte.commons.temporal.TemporalClient;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.commons.temporal.TemporalResponse;
import io.airbyte.commons.temporal.scheduling.RouterService;
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorType;
import io.airbyte.config.ConnectorJobOutput;
Expand Down Expand Up @@ -62,6 +64,8 @@ class DefaultSynchronousSchedulerClientTest {
private static final UUID UUID1 = UUID.randomUUID();
private static final UUID UUID2 = UUID.randomUUID();
private static final String UNCHECKED = "unchecked";
private static final String CHECK_TASK_QUEUE = "check";
private static final String DISCOVER_TASK_QUEUE = "discover";
private static final JsonNode CONFIGURATION = Jsons.jsonNode(ImmutableMap.builder()
.put("username", "airbyte")
.put("password", "abc")
Expand All @@ -80,6 +84,8 @@ class DefaultSynchronousSchedulerClientTest {
private JobTracker jobTracker;
private JobErrorReporter jobErrorReporter;
private OAuthConfigSupplier oAuthConfigSupplier;

private RouterService routerService;
private DefaultSynchronousSchedulerClient schedulerClient;

@BeforeEach
Expand All @@ -88,10 +94,14 @@ void setup() throws IOException {
jobTracker = mock(JobTracker.class);
jobErrorReporter = mock(JobErrorReporter.class);
oAuthConfigSupplier = mock(OAuthConfigSupplier.class);
schedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, jobErrorReporter, oAuthConfigSupplier);
routerService = mock(RouterService.class);
schedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, jobErrorReporter, oAuthConfigSupplier, routerService);

when(oAuthConfigSupplier.injectSourceOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION);
when(oAuthConfigSupplier.injectDestinationOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION);

when(routerService.getTaskQueueForWorkspace(any(), eq(TemporalJobType.CHECK_CONNECTION))).thenReturn(CHECK_TASK_QUEUE);
when(routerService.getTaskQueueForWorkspace(any(), eq(TemporalJobType.DISCOVER_SCHEMA))).thenReturn(DISCOVER_TASK_QUEUE);
}

private static JobMetadata createMetadata(final boolean succeeded) {
Expand Down Expand Up @@ -192,7 +202,7 @@ void testCreateSourceCheckConnectionJob() throws IOException {

final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class);
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput);
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig)))
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(CHECK_TASK_QUEUE), eq(jobCheckConnectionConfig)))
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
final SynchronousResponse<StandardCheckConnectionOutput> response =
schedulerClient.createSourceCheckConnectionJob(SOURCE_CONNECTION, DOCKER_IMAGE, PROTOCOL_VERSION, false);
Expand All @@ -211,7 +221,7 @@ void testCreateDestinationCheckConnectionJob() throws IOException {

final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class);
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput);
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig)))
when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(CHECK_TASK_QUEUE), eq(jobCheckConnectionConfig)))
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
final SynchronousResponse<StandardCheckConnectionOutput> response =
schedulerClient.createDestinationCheckConnectionJob(DESTINATION_CONNECTION, DOCKER_IMAGE, PROTOCOL_VERSION, false);
Expand All @@ -222,7 +232,7 @@ void testCreateDestinationCheckConnectionJob() throws IOException {
void testCreateDiscoverSchemaJob() throws IOException {
final UUID expectedCatalogId = UUID.randomUUID();
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withDiscoverCatalogId(expectedCatalogId);
when(temporalClient.submitDiscoverSchema(any(UUID.class), eq(0), any(JobDiscoverCatalogConfig.class)))
when(temporalClient.submitDiscoverSchema(any(UUID.class), eq(0), eq(DISCOVER_TASK_QUEUE), any(JobDiscoverCatalogConfig.class)))
.thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true)));
final SynchronousResponse<UUID> response =
schedulerClient.createDiscoverSchemaJob(SOURCE_CONNECTION, DOCKER_IMAGE, DOCKER_IMAGE_TAG, PROTOCOL_VERSION, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ public TemporalResponse<ConnectorJobOutput> submitGetSpec(final UUID jobId, fina

public TemporalResponse<ConnectorJobOutput> submitCheckConnection(final UUID jobId,
final int attempt,
final String taskQueue,
final JobCheckConnectionConfig config) {
final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt);
final IntegrationLauncherConfig launcherConfig = new IntegrationLauncherConfig()
Expand All @@ -352,11 +353,12 @@ public TemporalResponse<ConnectorJobOutput> submitCheckConnection(final UUID job
.withConnectionConfiguration(config.getConnectionConfiguration());

return execute(jobRunConfig,
() -> getWorkflowStub(CheckConnectionWorkflow.class, TemporalJobType.CHECK_CONNECTION).run(jobRunConfig, launcherConfig, input));
() -> getWorkflowStubWithTaskQueue(CheckConnectionWorkflow.class, taskQueue).run(jobRunConfig, launcherConfig, input));
}

public TemporalResponse<ConnectorJobOutput> submitDiscoverSchema(final UUID jobId,
final int attempt,
final String taskQueue,
final JobDiscoverCatalogConfig config) {
final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt);
final IntegrationLauncherConfig launcherConfig = new IntegrationLauncherConfig()
Expand All @@ -369,7 +371,7 @@ public TemporalResponse<ConnectorJobOutput> submitDiscoverSchema(final UUID jobI
.withSourceId(config.getSourceId()).withConnectorVersion(config.getConnectorVersion()).withConfigHash(config.getConfigHash());

return execute(jobRunConfig,
() -> getWorkflowStub(DiscoverCatalogWorkflow.class, TemporalJobType.DISCOVER_SCHEMA).run(jobRunConfig, launcherConfig, input));
() -> getWorkflowStubWithTaskQueue(DiscoverCatalogWorkflow.class, taskQueue).run(jobRunConfig, launcherConfig, input));
}

public TemporalResponse<StandardSyncOutput> submitSync(final long jobId, final int attempt, final JobSyncConfig config, final UUID connectionId) {
Expand Down Expand Up @@ -466,6 +468,10 @@ private <T> T getWorkflowStub(final Class<T> workflowClass, final TemporalJobTyp
return client.newWorkflowStub(workflowClass, TemporalWorkflowUtils.buildWorkflowOptions(jobType));
}

private <T> T getWorkflowStubWithTaskQueue(final Class<T> workflowClass, final String taskQueue) {
return client.newWorkflowStub(workflowClass, TemporalWorkflowUtils.buildWorkflowOptionsWithTaskQueue(taskQueue));
}

public ConnectionManagerWorkflow submitConnectionUpdaterAsync(final UUID connectionId) {
log.info("Starting the scheduler temporal wf");
final ConnectionManagerWorkflow connectionManagerWorkflow =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ public static WorkflowOptions buildWorkflowOptions(final TemporalJobType jobType
}

public static WorkflowOptions buildWorkflowOptions(final TemporalJobType jobType) {
return buildWorkflowOptionsWithTaskQueue(jobType.name());
}

public static WorkflowOptions buildWorkflowOptionsWithTaskQueue(final String taskQueue) {
return WorkflowOptions.newBuilder()
.setTaskQueue(jobType.name())
.setTaskQueue(taskQueue)
.setWorkflowTaskTimeout(Duration.ofSeconds(27)) // TODO parker - temporarily increasing this to a recognizable number to see if it changes
// error I'm seeing
// todo (cgardens) we do not leverage Temporal retries.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal.scheduling;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.config.Geography;
import jakarta.inject.Singleton;

@Singleton
public class DefaultTaskQueueMapper implements TaskQueueMapper {

@VisibleForTesting
static final String DEFAULT_SYNC_TASK_QUEUE = TemporalJobType.SYNC.name();
@VisibleForTesting
static final String DEFAULT_CHECK_TASK_QUEUE = TemporalJobType.CHECK_CONNECTION.name();
@VisibleForTesting
static final String DEFAULT_DISCOVER_TASK_QUEUE = TemporalJobType.DISCOVER_SCHEMA.name();

// By default, map every Geography value to the default task queue.
// To override this behavior, define a new TaskQueueMapper bean with the @Primary annotation.
@Override
public String getTaskQueue(final Geography geography, final TemporalJobType jobType) {
switch (jobType) {
case CHECK_CONNECTION:
return DEFAULT_CHECK_TASK_QUEUE;
case DISCOVER_SCHEMA:
return DEFAULT_DISCOVER_TASK_QUEUE;
case SYNC:
return DEFAULT_SYNC_TASK_QUEUE;
default:
throw new IllegalArgumentException(String.format("Unexpected jobType %s", jobType));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;
package io.airbyte.commons.temporal.scheduling;

import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.config.Geography;
import io.airbyte.config.persistence.ConfigRepository;
import jakarta.inject.Singleton;
Expand Down Expand Up @@ -32,9 +32,14 @@ public RouterService(final ConfigRepository configRepository, final TaskQueueMap
* Given a connectionId, look up the connection's configured {@link Geography} in the config DB and
* use it to determine which Task Queue should be used for this connection's sync.
*/
public String getTaskQueue(final UUID connectionId) throws IOException {
public String getTaskQueue(final UUID connectionId, final TemporalJobType jobType) throws IOException {
final Geography geography = configRepository.getGeographyForConnection(connectionId);
return taskQueueMapper.getTaskQueue(geography);
return taskQueueMapper.getTaskQueue(geography, jobType);
}

public String getTaskQueueForWorkspace(final UUID workspaceId, final TemporalJobType jobType) throws IOException {
final Geography geography = configRepository.getGeographyForWorkspace(workspaceId);
return taskQueueMapper.getTaskQueue(geography, jobType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.commons.temporal.scheduling;

import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.config.Geography;

/**
Expand All @@ -12,6 +13,6 @@
*/
public interface TaskQueueMapper {

String getTaskQueue(Geography geography);
String getTaskQueue(Geography geography, TemporalJobType jobType);

}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class TemporalClientTest {
private static final UUID JOB_UUID = UUID.randomUUID();
private static final long JOB_ID = 11L;
private static final int ATTEMPT_ID = 21;

private static final String CHECK_TASK_QUEUE = "CHECK_CONNECTION";
private static final String DISCOVER_TASK_QUEUE = "DISCOVER_SCHEMA";
private static final JobRunConfig JOB_RUN_CONFIG = new JobRunConfig()
.withJobId(String.valueOf(JOB_ID))
.withAttemptId((long) ATTEMPT_ID);
Expand Down Expand Up @@ -240,7 +243,7 @@ void testSubmitCheckConnection() {
final StandardCheckConnectionInput input = new StandardCheckConnectionInput()
.withConnectionConfiguration(checkConnectionConfig.getConnectionConfiguration());

temporalClient.submitCheckConnection(JOB_UUID, ATTEMPT_ID, checkConnectionConfig);
temporalClient.submitCheckConnection(JOB_UUID, ATTEMPT_ID, CHECK_TASK_QUEUE, checkConnectionConfig);
checkConnectionWorkflow.run(JOB_RUN_CONFIG, UUID_LAUNCHER_CONFIG, input);
verify(workflowClient).newWorkflowStub(CheckConnectionWorkflow.class,
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CHECK_CONNECTION));
Expand All @@ -257,7 +260,7 @@ void testSubmitDiscoverSchema() {
final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput()
.withConnectionConfiguration(checkConnectionConfig.getConnectionConfiguration());

temporalClient.submitDiscoverSchema(JOB_UUID, ATTEMPT_ID, checkConnectionConfig);
temporalClient.submitDiscoverSchema(JOB_UUID, ATTEMPT_ID, DISCOVER_TASK_QUEUE, checkConnectionConfig);
discoverCatalogWorkflow.run(JOB_RUN_CONFIG, UUID_LAUNCHER_CONFIG, input);
verify(workflowClient).newWorkflowStub(DiscoverCatalogWorkflow.class,
TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.DISCOVER_SCHEMA));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,14 @@ public Geography getGeographyForConnection(final UUID connectionId) throws IOExc
.fetchOneInto(Geography.class);
}

public Geography getGeographyForWorkspace(final UUID workspaceId) throws IOException {
return database.query(ctx -> ctx.select(WORKSPACE.GEOGRAPHY)
.from(WORKSPACE)
.where(WORKSPACE.ID.eq(workspaceId))
.limit(1))
.fetchOneInto(Geography.class);
}

/**
* Specialized query for efficiently determining eligibility for the Free Connector Program. If a
* workspace has at least one Alpha or Beta connector, users of that workspace will be prompted to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,15 @@ void testGetGeographyForConnection() throws IOException {
assertEquals(expected, actual);
}

@Test
void testGetGeographyForWorkspace() throws IOException {
final StandardWorkspace workspace = MockData.standardWorkspaces().get(0);
final Geography expected = workspace.getDefaultGeography();
final Geography actual = configRepository.getGeographyForWorkspace(workspace.getWorkspaceId());

assertEquals(expected, actual);
}

@SuppressWarnings("OptionalGetWithoutIsPresent")
@Test
void testGetMostRecentActorCatalogFetchEventForSource() throws SQLException, IOException {
Expand Down
Loading

0 comments on commit 48a5554

Please sign in to comment.