From 48a555484381e1ac5a277c1399e21fd6ed35fe16 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Mon, 6 Feb 2023 21:09:31 -0800 Subject: [PATCH] Add routing logic for check and discover workflow (#21822) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * api changes for writing discover catalog * api changes * format * worker change 1 * change return type of the API to return catalogId * worker to call api * typo * :tada: Source GoogleSheets - migrated SAT to strictness level (#21399) * migrated SAT to strictness level * fixed expected records * revert file from another source * changed extension to txt * changed extension to txt * 🐛Destination-Bigquery: Added an explicit error message if sync fails due to a config issue (#21144) * [19998] Destination-Bigquery: Added an explicit error message in sync fails due to a config issue * ci-connector-ops: split workflows(#21474) * CI: nightly build alpha sources and destinations (#21562) * Revert "Change main class in strict-encrypt destination and bump versions on both destinations to keep them in sync (#21509)" (#21567) This reverts commit 1d202d17079e948e9971514336080e743aacd792. * Fixes webhook updating logic (#21519) * ci_credentials: disable tooling test run by tox (#21580) * disable tox * rename steps * revert changes on experimental workflow * do not install tox * Revert "CI: nightly build alpha sources and destinations (#21562)" (#21589) This reverts commit 61f88f30134df1a71466cd63e0d73757a7ad1d8d. * Security update of default docker images (#21407) Because there is a lot of CVEs in those releases. Co-authored-by: Topher Lubaway * 📝 add docs for how to add normalization (#21563) * add docs * add schema link * update based on feedback * 🪟 🚦 E2E tests: clean up matchers (#20887) * improve serviceTypeDropdownOption selector * add test ids to PathPopout component(s) * add unique id's to table dropdowns * extend submitButtonClick to support optional click options * update dropdown(pathPopout) matchers * add test-id to Overlay component * remove redundant function brackets * revert changes onSubmit button click * fix dropDown overlay issue * move all duplicated intercepters to beforeEach * add test id's to Connections, Sources and Destinations tables * add table helper functions * update source page actions * intercepter fixes * update createTestConnection function with optional replication settings * remove extra Connection name check * replace "cypress-postgres" with "pg-promise" npm package * update cypress config * Revert "update createTestConnection function with optional replication settings" This reverts commit 8e47c7837b4e00322154d6750fc28717549737e2. * Revert "remove extra Connection name check" This reverts commit dfb19c7dd4b701de49db980229916766fe5a6f1b. * replace openSourceDestinationFromGrid with specific selector * replace openSourceDestinationFromGrid with specific selector * turn on test * add test-id's * fix selectors * update test * update test snapshots * fix lost data-testid after resolve merge conflicts * remove extra check * move clickOnCellInTable helper to common.ts file * remove empty line and comments * fix dropdownType * replace partial string check with exact * extract interceptors and waiters to separate file * fix selector for predefined PK * fix selector * add comment regarding dropdown * 🪟 🎨 [Free connectors] Update modal copy (#21600) * move start/end time options out of optional block (#21541) * lingering fix * reflecting api changes * test fix * reset to master * routing changes * remove unexpected merge * resolve dependency micronaut * resolve dependency * format * fix test * rename and refactor location of test files * test * redo the routing service * add test to config repository * query workspace not connection for discover/check * remove unused bean --------- Co-authored-by: midavadim Co-authored-by: Eugene Co-authored-by: Augustin Co-authored-by: Greg Solovyev Co-authored-by: Yatsuk Bogdan Co-authored-by: Hervé Commowick Co-authored-by: Topher Lubaway Co-authored-by: Pedro S. Lopez Co-authored-by: Vladimir Co-authored-by: Joey Marshment-Howell Co-authored-by: Lake Mossman --- .../DefaultSynchronousSchedulerClient.java | 18 ++++- ...DefaultSynchronousSchedulerClientTest.java | 18 ++++- .../commons/temporal/TemporalClient.java | 10 ++- .../temporal/TemporalWorkflowUtils.java | 6 +- .../scheduling/DefaultTaskQueueMapper.java | 38 +++++++++ .../temporal/scheduling/RouterService.java | 13 ++- .../temporal/scheduling/TaskQueueMapper.java | 3 +- .../commons/temporal/TemporalClientTest.java | 7 +- .../config/persistence/ConfigRepository.java | 8 ++ .../ConfigRepositoryE2EReadWriteTest.java | 9 +++ .../java/io/airbyte/server/ServerApp.java | 8 +- .../server/config/TemporalBeanFactory.java | 13 ++- .../workers/ApplicationInitializer.java | 80 ++++++++++++++----- .../scheduling/DefaultTaskQueueMapper.java | 37 --------- .../RouteToSyncTaskQueueActivityImpl.java | 5 +- .../src/main/resources/application.yml | 4 + .../DefaultTaskQueueMapperTest.java | 43 ++++++++++ .../scheduling/RouterServiceTest.java | 16 ++-- .../DefaultTaskQueueMapperTest.java | 41 ---------- 19 files changed, 245 insertions(+), 132 deletions(-) create mode 100644 airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/DefaultTaskQueueMapper.java rename {airbyte-workers/src/main/java/io/airbyte/workers => airbyte-commons-temporal/src/main/java/io/airbyte/commons}/temporal/scheduling/RouterService.java (66%) delete mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapper.java create mode 100644 airbyte-workers/src/test/java/io/airbyte/commons/temporal/scheduling/DefaultTaskQueueMapperTest.java rename airbyte-workers/src/test/java/io/airbyte/{workers => commons}/temporal/scheduling/RouterServiceTest.java (75%) delete mode 100644 airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapperTest.java diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/scheduler/DefaultSynchronousSchedulerClient.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/scheduler/DefaultSynchronousSchedulerClient.java index f01806f089583..c679c06acb98b 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/scheduler/DefaultSynchronousSchedulerClient.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/scheduler/DefaultSynchronousSchedulerClient.java @@ -12,7 +12,9 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.temporal.TemporalClient; +import io.airbyte.commons.temporal.TemporalJobType; import io.airbyte.commons.temporal.TemporalResponse; +import io.airbyte.commons.temporal.scheduling.RouterService; import io.airbyte.commons.version.Version; import io.airbyte.config.ActorType; import io.airbyte.config.ConnectorJobOutput; @@ -50,14 +52,18 @@ public class DefaultSynchronousSchedulerClient implements SynchronousSchedulerCl private final JobErrorReporter jobErrorReporter; private final OAuthConfigSupplier oAuthConfigSupplier; + private final RouterService routerService; + public DefaultSynchronousSchedulerClient(final TemporalClient temporalClient, final JobTracker jobTracker, final JobErrorReporter jobErrorReporter, - final OAuthConfigSupplier oAuthConfigSupplier) { + final OAuthConfigSupplier oAuthConfigSupplier, + final RouterService routerService) { this.temporalClient = temporalClient; this.jobTracker = jobTracker; this.jobErrorReporter = jobErrorReporter; this.oAuthConfigSupplier = oAuthConfigSupplier; + this.routerService = routerService; } @Override @@ -80,12 +86,13 @@ public SynchronousResponse createSourceCheckConne final UUID jobId = UUID.randomUUID(); final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage); + String taskQueue = routerService.getTaskQueueForWorkspace(source.getWorkspaceId(), TemporalJobType.CHECK_CONNECTION); return execute( ConfigType.CHECK_CONNECTION_SOURCE, jobReportingContext, source.getSourceDefinitionId(), - () -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, jobCheckConnectionConfig), + () -> temporalClient.submitCheckConnection(UUID.randomUUID(), 0, taskQueue, jobCheckConnectionConfig), ConnectorJobOutput::getCheckConnection, source.getWorkspaceId()); } @@ -110,12 +117,13 @@ public SynchronousResponse createDestinationCheck final UUID jobId = UUID.randomUUID(); final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage); + String taskQueue = routerService.getTaskQueueForWorkspace(destination.getWorkspaceId(), TemporalJobType.CHECK_CONNECTION); return execute( ConfigType.CHECK_CONNECTION_DESTINATION, jobReportingContext, destination.getDestinationDefinitionId(), - () -> temporalClient.submitCheckConnection(jobId, 0, jobCheckConnectionConfig), + () -> temporalClient.submitCheckConnection(jobId, 0, taskQueue, jobCheckConnectionConfig), ConnectorJobOutput::getCheckConnection, destination.getWorkspaceId()); } @@ -144,11 +152,13 @@ public SynchronousResponse createDiscoverSchemaJob(final SourceConnection final UUID jobId = UUID.randomUUID(); final ConnectorJobReportingContext jobReportingContext = new ConnectorJobReportingContext(jobId, dockerImage); + String taskQueue = routerService.getTaskQueueForWorkspace(source.getWorkspaceId(), TemporalJobType.DISCOVER_SCHEMA); + return execute( ConfigType.DISCOVER_SCHEMA, jobReportingContext, source.getSourceDefinitionId(), - () -> temporalClient.submitDiscoverSchema(jobId, 0, jobDiscoverCatalogConfig), + () -> temporalClient.submitDiscoverSchema(jobId, 0, taskQueue, jobDiscoverCatalogConfig), ConnectorJobOutput::getDiscoverCatalogId, source.getWorkspaceId()); } diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/scheduler/DefaultSynchronousSchedulerClientTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/scheduler/DefaultSynchronousSchedulerClientTest.java index bd9bbdc927649..d0e1a7d8b3cb6 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/scheduler/DefaultSynchronousSchedulerClientTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/scheduler/DefaultSynchronousSchedulerClientTest.java @@ -22,7 +22,9 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.temporal.JobMetadata; import io.airbyte.commons.temporal.TemporalClient; +import io.airbyte.commons.temporal.TemporalJobType; import io.airbyte.commons.temporal.TemporalResponse; +import io.airbyte.commons.temporal.scheduling.RouterService; import io.airbyte.commons.version.Version; import io.airbyte.config.ActorType; import io.airbyte.config.ConnectorJobOutput; @@ -62,6 +64,8 @@ class DefaultSynchronousSchedulerClientTest { private static final UUID UUID1 = UUID.randomUUID(); private static final UUID UUID2 = UUID.randomUUID(); private static final String UNCHECKED = "unchecked"; + private static final String CHECK_TASK_QUEUE = "check"; + private static final String DISCOVER_TASK_QUEUE = "discover"; private static final JsonNode CONFIGURATION = Jsons.jsonNode(ImmutableMap.builder() .put("username", "airbyte") .put("password", "abc") @@ -80,6 +84,8 @@ class DefaultSynchronousSchedulerClientTest { private JobTracker jobTracker; private JobErrorReporter jobErrorReporter; private OAuthConfigSupplier oAuthConfigSupplier; + + private RouterService routerService; private DefaultSynchronousSchedulerClient schedulerClient; @BeforeEach @@ -88,10 +94,14 @@ void setup() throws IOException { jobTracker = mock(JobTracker.class); jobErrorReporter = mock(JobErrorReporter.class); oAuthConfigSupplier = mock(OAuthConfigSupplier.class); - schedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, jobErrorReporter, oAuthConfigSupplier); + routerService = mock(RouterService.class); + schedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, jobErrorReporter, oAuthConfigSupplier, routerService); when(oAuthConfigSupplier.injectSourceOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION); when(oAuthConfigSupplier.injectDestinationOAuthParameters(any(), any(), eq(CONFIGURATION))).thenReturn(CONFIGURATION); + + when(routerService.getTaskQueueForWorkspace(any(), eq(TemporalJobType.CHECK_CONNECTION))).thenReturn(CHECK_TASK_QUEUE); + when(routerService.getTaskQueueForWorkspace(any(), eq(TemporalJobType.DISCOVER_SCHEMA))).thenReturn(DISCOVER_TASK_QUEUE); } private static JobMetadata createMetadata(final boolean succeeded) { @@ -192,7 +202,7 @@ void testCreateSourceCheckConnectionJob() throws IOException { final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class); final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput); - when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig))) + when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(CHECK_TASK_QUEUE), eq(jobCheckConnectionConfig))) .thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true))); final SynchronousResponse response = schedulerClient.createSourceCheckConnectionJob(SOURCE_CONNECTION, DOCKER_IMAGE, PROTOCOL_VERSION, false); @@ -211,7 +221,7 @@ void testCreateDestinationCheckConnectionJob() throws IOException { final StandardCheckConnectionOutput mockOutput = mock(StandardCheckConnectionOutput.class); final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withCheckConnection(mockOutput); - when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(jobCheckConnectionConfig))) + when(temporalClient.submitCheckConnection(any(UUID.class), eq(0), eq(CHECK_TASK_QUEUE), eq(jobCheckConnectionConfig))) .thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true))); final SynchronousResponse response = schedulerClient.createDestinationCheckConnectionJob(DESTINATION_CONNECTION, DOCKER_IMAGE, PROTOCOL_VERSION, false); @@ -222,7 +232,7 @@ void testCreateDestinationCheckConnectionJob() throws IOException { void testCreateDiscoverSchemaJob() throws IOException { final UUID expectedCatalogId = UUID.randomUUID(); final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withDiscoverCatalogId(expectedCatalogId); - when(temporalClient.submitDiscoverSchema(any(UUID.class), eq(0), any(JobDiscoverCatalogConfig.class))) + when(temporalClient.submitDiscoverSchema(any(UUID.class), eq(0), eq(DISCOVER_TASK_QUEUE), any(JobDiscoverCatalogConfig.class))) .thenReturn(new TemporalResponse<>(jobOutput, createMetadata(true))); final SynchronousResponse response = schedulerClient.createDiscoverSchemaJob(SOURCE_CONNECTION, DOCKER_IMAGE, DOCKER_IMAGE_TAG, PROTOCOL_VERSION, false); diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java index e57d584961a32..129b7ae44c965 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java @@ -338,6 +338,7 @@ public TemporalResponse submitGetSpec(final UUID jobId, fina public TemporalResponse submitCheckConnection(final UUID jobId, final int attempt, + final String taskQueue, final JobCheckConnectionConfig config) { final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt); final IntegrationLauncherConfig launcherConfig = new IntegrationLauncherConfig() @@ -352,11 +353,12 @@ public TemporalResponse submitCheckConnection(final UUID job .withConnectionConfiguration(config.getConnectionConfiguration()); return execute(jobRunConfig, - () -> getWorkflowStub(CheckConnectionWorkflow.class, TemporalJobType.CHECK_CONNECTION).run(jobRunConfig, launcherConfig, input)); + () -> getWorkflowStubWithTaskQueue(CheckConnectionWorkflow.class, taskQueue).run(jobRunConfig, launcherConfig, input)); } public TemporalResponse submitDiscoverSchema(final UUID jobId, final int attempt, + final String taskQueue, final JobDiscoverCatalogConfig config) { final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt); final IntegrationLauncherConfig launcherConfig = new IntegrationLauncherConfig() @@ -369,7 +371,7 @@ public TemporalResponse submitDiscoverSchema(final UUID jobI .withSourceId(config.getSourceId()).withConnectorVersion(config.getConnectorVersion()).withConfigHash(config.getConfigHash()); return execute(jobRunConfig, - () -> getWorkflowStub(DiscoverCatalogWorkflow.class, TemporalJobType.DISCOVER_SCHEMA).run(jobRunConfig, launcherConfig, input)); + () -> getWorkflowStubWithTaskQueue(DiscoverCatalogWorkflow.class, taskQueue).run(jobRunConfig, launcherConfig, input)); } public TemporalResponse submitSync(final long jobId, final int attempt, final JobSyncConfig config, final UUID connectionId) { @@ -466,6 +468,10 @@ private T getWorkflowStub(final Class workflowClass, final TemporalJobTyp return client.newWorkflowStub(workflowClass, TemporalWorkflowUtils.buildWorkflowOptions(jobType)); } + private T getWorkflowStubWithTaskQueue(final Class workflowClass, final String taskQueue) { + return client.newWorkflowStub(workflowClass, TemporalWorkflowUtils.buildWorkflowOptionsWithTaskQueue(taskQueue)); + } + public ConnectionManagerWorkflow submitConnectionUpdaterAsync(final UUID connectionId) { log.info("Starting the scheduler temporal wf"); final ConnectionManagerWorkflow connectionManagerWorkflow = diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalWorkflowUtils.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalWorkflowUtils.java index aeeb018d94a1f..f133033eb58e1 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalWorkflowUtils.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalWorkflowUtils.java @@ -50,8 +50,12 @@ public static WorkflowOptions buildWorkflowOptions(final TemporalJobType jobType } public static WorkflowOptions buildWorkflowOptions(final TemporalJobType jobType) { + return buildWorkflowOptionsWithTaskQueue(jobType.name()); + } + + public static WorkflowOptions buildWorkflowOptionsWithTaskQueue(final String taskQueue) { return WorkflowOptions.newBuilder() - .setTaskQueue(jobType.name()) + .setTaskQueue(taskQueue) .setWorkflowTaskTimeout(Duration.ofSeconds(27)) // TODO parker - temporarily increasing this to a recognizable number to see if it changes // error I'm seeing // todo (cgardens) we do not leverage Temporal retries. diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/DefaultTaskQueueMapper.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/DefaultTaskQueueMapper.java new file mode 100644 index 0000000000000..5db14cc5c8604 --- /dev/null +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/DefaultTaskQueueMapper.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.temporal.scheduling; + +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.temporal.TemporalJobType; +import io.airbyte.config.Geography; +import jakarta.inject.Singleton; + +@Singleton +public class DefaultTaskQueueMapper implements TaskQueueMapper { + + @VisibleForTesting + static final String DEFAULT_SYNC_TASK_QUEUE = TemporalJobType.SYNC.name(); + @VisibleForTesting + static final String DEFAULT_CHECK_TASK_QUEUE = TemporalJobType.CHECK_CONNECTION.name(); + @VisibleForTesting + static final String DEFAULT_DISCOVER_TASK_QUEUE = TemporalJobType.DISCOVER_SCHEMA.name(); + + // By default, map every Geography value to the default task queue. + // To override this behavior, define a new TaskQueueMapper bean with the @Primary annotation. + @Override + public String getTaskQueue(final Geography geography, final TemporalJobType jobType) { + switch (jobType) { + case CHECK_CONNECTION: + return DEFAULT_CHECK_TASK_QUEUE; + case DISCOVER_SCHEMA: + return DEFAULT_DISCOVER_TASK_QUEUE; + case SYNC: + return DEFAULT_SYNC_TASK_QUEUE; + default: + throw new IllegalArgumentException(String.format("Unexpected jobType %s", jobType)); + } + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/RouterService.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/RouterService.java similarity index 66% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/RouterService.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/RouterService.java index 14c911aecb9f7..4d100ec32e957 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/RouterService.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/RouterService.java @@ -2,9 +2,9 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal.scheduling; +package io.airbyte.commons.temporal.scheduling; -import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; +import io.airbyte.commons.temporal.TemporalJobType; import io.airbyte.config.Geography; import io.airbyte.config.persistence.ConfigRepository; import jakarta.inject.Singleton; @@ -32,9 +32,14 @@ public RouterService(final ConfigRepository configRepository, final TaskQueueMap * Given a connectionId, look up the connection's configured {@link Geography} in the config DB and * use it to determine which Task Queue should be used for this connection's sync. */ - public String getTaskQueue(final UUID connectionId) throws IOException { + public String getTaskQueue(final UUID connectionId, final TemporalJobType jobType) throws IOException { final Geography geography = configRepository.getGeographyForConnection(connectionId); - return taskQueueMapper.getTaskQueue(geography); + return taskQueueMapper.getTaskQueue(geography, jobType); + } + + public String getTaskQueueForWorkspace(final UUID workspaceId, final TemporalJobType jobType) throws IOException { + final Geography geography = configRepository.getGeographyForWorkspace(workspaceId); + return taskQueueMapper.getTaskQueue(geography, jobType); } } diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/TaskQueueMapper.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/TaskQueueMapper.java index c7003ef23b1bb..c13949d12d837 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/TaskQueueMapper.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/TaskQueueMapper.java @@ -4,6 +4,7 @@ package io.airbyte.commons.temporal.scheduling; +import io.airbyte.commons.temporal.TemporalJobType; import io.airbyte.config.Geography; /** @@ -12,6 +13,6 @@ */ public interface TaskQueueMapper { - String getTaskQueue(Geography geography); + String getTaskQueue(Geography geography, TemporalJobType jobType); } diff --git a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java index 228c632cf8d6d..fad2eff33288a 100644 --- a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java +++ b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java @@ -78,6 +78,9 @@ public class TemporalClientTest { private static final UUID JOB_UUID = UUID.randomUUID(); private static final long JOB_ID = 11L; private static final int ATTEMPT_ID = 21; + + private static final String CHECK_TASK_QUEUE = "CHECK_CONNECTION"; + private static final String DISCOVER_TASK_QUEUE = "DISCOVER_SCHEMA"; private static final JobRunConfig JOB_RUN_CONFIG = new JobRunConfig() .withJobId(String.valueOf(JOB_ID)) .withAttemptId((long) ATTEMPT_ID); @@ -240,7 +243,7 @@ void testSubmitCheckConnection() { final StandardCheckConnectionInput input = new StandardCheckConnectionInput() .withConnectionConfiguration(checkConnectionConfig.getConnectionConfiguration()); - temporalClient.submitCheckConnection(JOB_UUID, ATTEMPT_ID, checkConnectionConfig); + temporalClient.submitCheckConnection(JOB_UUID, ATTEMPT_ID, CHECK_TASK_QUEUE, checkConnectionConfig); checkConnectionWorkflow.run(JOB_RUN_CONFIG, UUID_LAUNCHER_CONFIG, input); verify(workflowClient).newWorkflowStub(CheckConnectionWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CHECK_CONNECTION)); @@ -257,7 +260,7 @@ void testSubmitDiscoverSchema() { final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput() .withConnectionConfiguration(checkConnectionConfig.getConnectionConfiguration()); - temporalClient.submitDiscoverSchema(JOB_UUID, ATTEMPT_ID, checkConnectionConfig); + temporalClient.submitDiscoverSchema(JOB_UUID, ATTEMPT_ID, DISCOVER_TASK_QUEUE, checkConnectionConfig); discoverCatalogWorkflow.run(JOB_RUN_CONFIG, UUID_LAUNCHER_CONFIG, input); verify(workflowClient).newWorkflowStub(DiscoverCatalogWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.DISCOVER_SCHEMA)); diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 6d4e37c56fcf6..2b8d236707e11 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -1544,6 +1544,14 @@ public Geography getGeographyForConnection(final UUID connectionId) throws IOExc .fetchOneInto(Geography.class); } + public Geography getGeographyForWorkspace(final UUID workspaceId) throws IOException { + return database.query(ctx -> ctx.select(WORKSPACE.GEOGRAPHY) + .from(WORKSPACE) + .where(WORKSPACE.ID.eq(workspaceId)) + .limit(1)) + .fetchOneInto(Geography.class); + } + /** * Specialized query for efficiently determining eligibility for the Free Connector Program. If a * workspace has at least one Alpha or Beta connector, users of that workspace will be prompted to diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java index 872020e46089c..264ce6e7c7569 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryE2EReadWriteTest.java @@ -574,6 +574,15 @@ void testGetGeographyForConnection() throws IOException { assertEquals(expected, actual); } + @Test + void testGetGeographyForWorkspace() throws IOException { + final StandardWorkspace workspace = MockData.standardWorkspaces().get(0); + final Geography expected = workspace.getDefaultGeography(); + final Geography actual = configRepository.getGeographyForWorkspace(workspace.getWorkspaceId()); + + assertEquals(expected, actual); + } + @SuppressWarnings("OptionalGetWithoutIsPresent") @Test void testGetMostRecentActorCatalogFetchEventForSource() throws SQLException, IOException { diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 30e11d97c09c5..9fc1e678e942c 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -22,6 +22,8 @@ import io.airbyte.commons.temporal.TemporalClient; import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.commons.temporal.TemporalWorkflowUtils; +import io.airbyte.commons.temporal.scheduling.RouterService; +import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; import io.airbyte.commons.version.AirbyteProtocolVersionRange; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; @@ -152,7 +154,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final DSLContext configsDslContext, final Flyway configsFlyway, final DSLContext jobsDslContext, - final Flyway jobsFlyway) + final Flyway jobsFlyway, + final TaskQueueMapper taskQueueMapper) throws Exception { LogClientSingleton.getInstance().setWorkspaceMdc( configs.getWorkerEnvironment(), @@ -221,8 +224,9 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, streamResetRecordsHelper); final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, trackingClient); + RouterService routerService = new RouterService(configRepository, taskQueueMapper); final DefaultSynchronousSchedulerClient syncSchedulerClient = - new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, jobErrorReporter, oAuthConfigSupplier); + new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, jobErrorReporter, oAuthConfigSupplier, routerService); final HttpClient httpClient = HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build(); final EventRunner eventRunner = new TemporalEventRunner(temporalClient); diff --git a/airbyte-server/src/main/java/io/airbyte/server/config/TemporalBeanFactory.java b/airbyte-server/src/main/java/io/airbyte/server/config/TemporalBeanFactory.java index f3dd40c530af8..2ac5afd53a98d 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/config/TemporalBeanFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/config/TemporalBeanFactory.java @@ -10,6 +10,9 @@ import io.airbyte.commons.server.scheduler.DefaultSynchronousSchedulerClient; import io.airbyte.commons.server.scheduler.SynchronousSchedulerClient; import io.airbyte.commons.temporal.TemporalClient; +import io.airbyte.commons.temporal.scheduling.DefaultTaskQueueMapper; +import io.airbyte.commons.temporal.scheduling.RouterService; +import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs.DeploymentMode; import io.airbyte.config.Configs.TrackingStrategy; @@ -56,12 +59,18 @@ public OAuthConfigSupplier oAuthConfigSupplier(final ConfigRepository configRepo return new OAuthConfigSupplier(configRepository, trackingClient); } + @Singleton + public TaskQueueMapper taskQueueMapper() { + return new DefaultTaskQueueMapper(); + } + @Singleton public SynchronousSchedulerClient synchronousSchedulerClient(final TemporalClient temporalClient, final JobTracker jobTracker, final JobErrorReporter jobErrorReporter, - final OAuthConfigSupplier oAuthConfigSupplier) { - return new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, jobErrorReporter, oAuthConfigSupplier); + final OAuthConfigSupplier oAuthConfigSupplier, + final RouterService routerService) { + return new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, jobErrorReporter, oAuthConfigSupplier, routerService); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java b/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java index 0fcd293279106..10f89bfb19bef 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java @@ -142,6 +142,12 @@ public class ApplicationInitializer implements ApplicationEventListener taskQueues = getCheckTaskQueue(); + for (final String taskQueue : taskQueues) { + final Worker checkConnectionWorker = + factory.newWorker(taskQueue, + getWorkerOptions(maxWorkersConfig.getMaxCheckWorkers())); + final WorkflowImplementationOptions options = WorkflowImplementationOptions.newBuilder() + .setFailWorkflowExceptionTypes(NonDeterministicException.class).build(); + checkConnectionWorker + .registerWorkflowImplementationTypes(options, + temporalProxyHelper.proxyWorkflowClass(CheckConnectionWorkflowImpl.class)); + checkConnectionWorker.registerActivitiesImplementations( + checkConnectionActivities.orElseThrow().toArray(new Object[] {})); + log.info("Check Connection Workflow registered."); + } } private void registerConnectionManager(final WorkerFactory factory, @@ -277,17 +286,20 @@ private void registerConnectionManager(final WorkerFactory factory, private void registerDiscover(final WorkerFactory factory, final MaxWorkersConfig maxWorkersConfig) { - final Worker discoverWorker = - factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), - getWorkerOptions(maxWorkersConfig.getMaxDiscoverWorkers())); - final WorkflowImplementationOptions options = WorkflowImplementationOptions.newBuilder() - .setFailWorkflowExceptionTypes(NonDeterministicException.class).build(); - discoverWorker - .registerWorkflowImplementationTypes(options, - temporalProxyHelper.proxyWorkflowClass(DiscoverCatalogWorkflowImpl.class)); - discoverWorker.registerActivitiesImplementations( - discoverActivities.orElseThrow().toArray(new Object[] {})); - log.info("Discover Workflow registered."); + final Set taskQueues = getDiscoverTaskQueue(); + for (final String taskQueue : taskQueues) { + final Worker discoverWorker = + factory.newWorker(taskQueue, + getWorkerOptions(maxWorkersConfig.getMaxDiscoverWorkers())); + final WorkflowImplementationOptions options = WorkflowImplementationOptions.newBuilder() + .setFailWorkflowExceptionTypes(NonDeterministicException.class).build(); + discoverWorker + .registerWorkflowImplementationTypes(options, + temporalProxyHelper.proxyWorkflowClass(DiscoverCatalogWorkflowImpl.class)); + discoverWorker.registerActivitiesImplementations( + discoverActivities.orElseThrow().toArray(new Object[] {})); + log.info("Discover Workflow registered."); + } } private void registerGetSpec(final WorkerFactory factory, @@ -366,4 +378,28 @@ private Set getSyncTaskQueue() { return Arrays.stream(syncTaskQueue.split(",")).collect(Collectors.toSet()); } + /** + * Retrieve and parse the sync workflow task queue configuration. + * + * @return A set of Temporal task queues for the sync workflow. + */ + private Set getCheckTaskQueue() { + if (StringUtils.isEmpty(checkTaskQueue)) { + return Set.of(); + } + return Arrays.stream(checkTaskQueue.split(",")).collect(Collectors.toSet()); + } + + /** + * Retrieve and parse the sync workflow task queue configuration. + * + * @return A set of Temporal task queues for the sync workflow. + */ + private Set getDiscoverTaskQueue() { + if (StringUtils.isEmpty(discoverTaskQueue)) { + return Set.of(); + } + return Arrays.stream(discoverTaskQueue.split(",")).collect(Collectors.toSet()); + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapper.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapper.java deleted file mode 100644 index fadd2b1e4b0b5..0000000000000 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapper.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.temporal.scheduling; - -import com.google.common.annotations.VisibleForTesting; -import io.airbyte.commons.temporal.TemporalJobType; -import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; -import io.airbyte.config.Geography; -import jakarta.inject.Singleton; -import java.util.Map; - -@Singleton -public class DefaultTaskQueueMapper implements TaskQueueMapper { - - @VisibleForTesting - static final String DEFAULT_SYNC_TASK_QUEUE = TemporalJobType.SYNC.name(); - - // By default, map every Geography value to the default task queue. - // To override this behavior, define a new TaskQueueMapper bean with the @Primary annotation. - @VisibleForTesting - static final Map GEOGRAPHY_TASK_QUEUE_MAP = Map.of( - Geography.AUTO, DEFAULT_SYNC_TASK_QUEUE, - Geography.US, DEFAULT_SYNC_TASK_QUEUE, - Geography.EU, DEFAULT_SYNC_TASK_QUEUE); - - @Override - public String getTaskQueue(final Geography geography) { - if (GEOGRAPHY_TASK_QUEUE_MAP.containsKey(geography)) { - return GEOGRAPHY_TASK_QUEUE_MAP.get(geography); - } - - throw new IllegalArgumentException(String.format("Unexpected geography %s", geography)); - } - -} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java index 30cbacb97c55c..4b503609f7ca2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java @@ -8,9 +8,10 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; import datadog.trace.api.Trace; +import io.airbyte.commons.temporal.TemporalJobType; import io.airbyte.commons.temporal.exception.RetryableException; +import io.airbyte.commons.temporal.scheduling.RouterService; import io.airbyte.metrics.lib.ApmTraceUtils; -import io.airbyte.workers.temporal.scheduling.RouterService; import jakarta.inject.Singleton; import java.io.IOException; import java.util.Map; @@ -32,7 +33,7 @@ public RouteToSyncTaskQueueOutput route(final RouteToSyncTaskQueueInput input) { ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId())); try { - final String taskQueueForConnectionId = routerService.getTaskQueue(input.getConnectionId()); + final String taskQueueForConnectionId = routerService.getTaskQueue(input.getConnectionId(), TemporalJobType.SYNC); return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId); } catch (final IOException e) { diff --git a/airbyte-workers/src/main/resources/application.yml b/airbyte-workers/src/main/resources/application.yml index 967ae42db96b8..905850f0401d3 100644 --- a/airbyte-workers/src/main/resources/application.yml +++ b/airbyte-workers/src/main/resources/application.yml @@ -61,6 +61,10 @@ airbyte: plane: auth-endpoint: ${CONTROL_PLANE_AUTH_ENDPOINT:} data: + check: + task-queue: ${DATA_CHECK_TASK_QUEUES:CHECK_CONNECTION} + discover: + task-queue: ${DATA_DISCOVER_TASK_QUEUES:DISCOVER_SCHEMA} sync: task-queue: ${DATA_SYNC_TASK_QUEUES:SYNC} plane: diff --git a/airbyte-workers/src/test/java/io/airbyte/commons/temporal/scheduling/DefaultTaskQueueMapperTest.java b/airbyte-workers/src/test/java/io/airbyte/commons/temporal/scheduling/DefaultTaskQueueMapperTest.java new file mode 100644 index 0000000000000..3278d94da049d --- /dev/null +++ b/airbyte-workers/src/test/java/io/airbyte/commons/temporal/scheduling/DefaultTaskQueueMapperTest.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.temporal.scheduling; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import io.airbyte.commons.temporal.TemporalJobType; +import io.airbyte.config.Geography; +import org.junit.jupiter.api.Test; + +class DefaultTaskQueueMapperTest { + + @Test + void testGetSyncTaskQueue() { + // By default, every Geography should map to the default SYNC task queue + final TaskQueueMapper mapper = new DefaultTaskQueueMapper(); + + assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO, TemporalJobType.SYNC)); + assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.US, TemporalJobType.SYNC)); + assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.EU, TemporalJobType.SYNC)); + } + + @Test + void testGetCheckTaskQueue() { + final TaskQueueMapper mapper = new DefaultTaskQueueMapper(); + + assertEquals(DefaultTaskQueueMapper.DEFAULT_CHECK_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO, TemporalJobType.CHECK_CONNECTION)); + assertEquals(DefaultTaskQueueMapper.DEFAULT_CHECK_TASK_QUEUE, mapper.getTaskQueue(Geography.US, TemporalJobType.CHECK_CONNECTION)); + assertEquals(DefaultTaskQueueMapper.DEFAULT_CHECK_TASK_QUEUE, mapper.getTaskQueue(Geography.EU, TemporalJobType.CHECK_CONNECTION)); + } + + @Test + void testGetDiscoverTaskQueue() { + final TaskQueueMapper mapper = new DefaultTaskQueueMapper(); + + assertEquals(DefaultTaskQueueMapper.DEFAULT_DISCOVER_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO, TemporalJobType.DISCOVER_SCHEMA)); + assertEquals(DefaultTaskQueueMapper.DEFAULT_DISCOVER_TASK_QUEUE, mapper.getTaskQueue(Geography.US, TemporalJobType.DISCOVER_SCHEMA)); + assertEquals(DefaultTaskQueueMapper.DEFAULT_DISCOVER_TASK_QUEUE, mapper.getTaskQueue(Geography.EU, TemporalJobType.DISCOVER_SCHEMA)); + } + +} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/RouterServiceTest.java b/airbyte-workers/src/test/java/io/airbyte/commons/temporal/scheduling/RouterServiceTest.java similarity index 75% rename from airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/RouterServiceTest.java rename to airbyte-workers/src/test/java/io/airbyte/commons/temporal/scheduling/RouterServiceTest.java index bf5a0c718f4f8..7007541f94fbd 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/RouterServiceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/commons/temporal/scheduling/RouterServiceTest.java @@ -2,11 +2,11 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal.scheduling; +package io.airbyte.commons.temporal.scheduling; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; +import io.airbyte.commons.temporal.TemporalJobType; import io.airbyte.config.Geography; import io.airbyte.config.persistence.ConfigRepository; import java.io.IOException; @@ -40,21 +40,21 @@ class RouterServiceTest { void init() { routerService = new RouterService(mConfigRepository, mTaskQueueMapper); - Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.AUTO)).thenReturn(US_TASK_QUEUE); - Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.US)).thenReturn(US_TASK_QUEUE); - Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.EU)).thenReturn(EU_TASK_QUEUE); + Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.AUTO, TemporalJobType.SYNC)).thenReturn(US_TASK_QUEUE); + Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.US, TemporalJobType.SYNC)).thenReturn(US_TASK_QUEUE); + Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.EU, TemporalJobType.SYNC)).thenReturn(EU_TASK_QUEUE); } @Test void testGetTaskQueue() throws IOException { Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.AUTO); - assertEquals(US_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID)); + assertEquals(US_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID, TemporalJobType.SYNC)); Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.US); - assertEquals(US_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID)); + assertEquals(US_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID, TemporalJobType.SYNC)); Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.EU); - assertEquals(EU_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID)); + assertEquals(EU_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID, TemporalJobType.SYNC)); } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapperTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapperTest.java deleted file mode 100644 index 81560faaca6e0..0000000000000 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapperTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.temporal.scheduling; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; -import io.airbyte.config.Geography; -import java.util.Arrays; -import java.util.Set; -import java.util.stream.Collectors; -import org.junit.jupiter.api.Test; - -class DefaultTaskQueueMapperTest { - - @Test - void testGetTaskQueue() { - // By default, every Geography should map to the default SYNC task queue - final TaskQueueMapper mapper = new DefaultTaskQueueMapper(); - - assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO)); - assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.US)); - assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.EU)); - } - - /** - * If this test fails, it likely means that a new value was added to the {@link Geography} enum. A - * new entry must be added to {@link DefaultTaskQueueMapper#GEOGRAPHY_TASK_QUEUE_MAP} to get this - * test to pass. - */ - @Test - void testAllGeographiesHaveAMapping() { - final Set allGeographies = Arrays.stream(Geography.values()).collect(Collectors.toSet()); - final Set mappedGeographies = DefaultTaskQueueMapper.GEOGRAPHY_TASK_QUEUE_MAP.keySet(); - - assertEquals(allGeographies, mappedGeographies); - } - -}