From f3b717bf9f1efe6dd7f254ef1b21aa7cdeec8337 Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Fri, 23 Jun 2023 10:53:50 -0700 Subject: [PATCH] router service to route sync to expanded dataplane (#7476) --- .../temporal/scheduling/RouterService.java | 6 +++++- .../temporal/scheduling/RouterServiceTest.java | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/RouterService.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/RouterService.java index 7bd1f6041d6..96c29c2d5e8 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/RouterService.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/RouterService.java @@ -48,7 +48,11 @@ public String getTaskQueue(final UUID connectionId, final TemporalJobType jobTyp final Geography geography = configRepository.getGeographyForConnection(connectionId); final UUID workspaceId = configRepository.getStandardWorkspaceFromConnection(connectionId, false).getWorkspaceId(); if (featureFlagClient.boolVariation(ShouldRunOnGkeDataplane.INSTANCE, new Workspace(workspaceId))) { - return taskQueueMapper.getTaskQueueFlagged(geography, jobType); + if (featureFlagClient.boolVariation(ShouldRunOnExpandedGkeDataplane.INSTANCE, new Workspace(workspaceId))) { + return taskQueueMapper.getTaskQueueExpanded(geography, jobType); + } else { + return taskQueueMapper.getTaskQueueFlagged(geography, jobType); + } } else { return taskQueueMapper.getTaskQueue(geography, jobType); } diff --git a/airbyte-workers/src/test/java/io/airbyte/commons/temporal/scheduling/RouterServiceTest.java b/airbyte-workers/src/test/java/io/airbyte/commons/temporal/scheduling/RouterServiceTest.java index 05bececcff4..97a1fc514a8 100644 --- a/airbyte-workers/src/test/java/io/airbyte/commons/temporal/scheduling/RouterServiceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/commons/temporal/scheduling/RouterServiceTest.java @@ -105,6 +105,21 @@ void testGetTaskQueueBehindFlag() throws IOException, ConfigNotFoundException { assertEquals(EU_FLAGGED_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID, TemporalJobType.SYNC)); } + @Test + void testGetTaskQueueBehindExpandedFlag() throws IOException, ConfigNotFoundException { + Mockito.when(mockFeatureFlagClient.boolVariation(ShouldRunOnGkeDataplane.INSTANCE, new Workspace(WORKSPACE_ID))).thenReturn(true); + Mockito.when(mockFeatureFlagClient.boolVariation(ShouldRunOnExpandedGkeDataplane.INSTANCE, new Workspace(WORKSPACE_ID))).thenReturn(true); + + Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.AUTO); + assertEquals(US_EXPANDED_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID, TemporalJobType.SYNC)); + + Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.US); + assertEquals(US_EXPANDED_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID, TemporalJobType.SYNC)); + + Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.EU); + assertEquals(EU_EXPANDED_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID, TemporalJobType.SYNC)); + } + @Test void testGetWorkspaceTaskQueue() throws IOException, ConfigNotFoundException { Mockito.when(mockFeatureFlagClient.boolVariation(ShouldRunOnGkeDataplane.INSTANCE, new Workspace(WORKSPACE_ID))).thenReturn(false);