forked from airbytehq/airbyte
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Geography support to RouterService (airbytehq#17902)
* router service uses geography in database instead of env var move geography map to a helper that can be overridden with a separate implementation in cloud format pmd fix import move geography mapper interface to airbyte-commons-temporal * add DefaultGeographyMapper back in * remove all args constructor and extranneous import * rename GeographyMapper to TaskQueueMapper
- Loading branch information
1 parent
3108f7d
commit 7d5628d
Showing
13 changed files
with
226 additions
and
134 deletions.
There are no files selected for viewing
17 changes: 17 additions & 0 deletions
17
...ommons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/TaskQueueMapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.commons.temporal.scheduling; | ||
|
||
import io.airbyte.config.Geography; | ||
|
||
/** | ||
* Maps a {@link Geography} to a Temporal Task Queue that should be used to run syncs for the given | ||
* Geography. | ||
*/ | ||
public interface TaskQueueMapper { | ||
|
||
String getTaskQueue(Geography geography); | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
37 changes: 37 additions & 0 deletions
37
...-workers/src/main/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.temporal.scheduling; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import io.airbyte.commons.temporal.TemporalJobType; | ||
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; | ||
import io.airbyte.config.Geography; | ||
import jakarta.inject.Singleton; | ||
import java.util.Map; | ||
|
||
@Singleton | ||
public class DefaultTaskQueueMapper implements TaskQueueMapper { | ||
|
||
@VisibleForTesting | ||
static final String DEFAULT_SYNC_TASK_QUEUE = TemporalJobType.SYNC.name(); | ||
|
||
// By default, map every Geography value to the default task queue. | ||
// To override this behavior, define a new TaskQueueMapper bean with the @Primary annotation. | ||
@VisibleForTesting | ||
static final Map<Geography, String> GEOGRAPHY_TASK_QUEUE_MAP = Map.of( | ||
Geography.AUTO, DEFAULT_SYNC_TASK_QUEUE, | ||
Geography.US, DEFAULT_SYNC_TASK_QUEUE, | ||
Geography.EU, DEFAULT_SYNC_TASK_QUEUE); | ||
|
||
@Override | ||
public String getTaskQueue(final Geography geography) { | ||
if (GEOGRAPHY_TASK_QUEUE_MAP.containsKey(geography)) { | ||
return GEOGRAPHY_TASK_QUEUE_MAP.get(geography); | ||
} | ||
|
||
throw new IllegalArgumentException(String.format("Unexpected geography %s", geography)); | ||
} | ||
|
||
} |
40 changes: 40 additions & 0 deletions
40
airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/RouterService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.temporal.scheduling; | ||
|
||
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; | ||
import io.airbyte.config.Geography; | ||
import io.airbyte.config.persistence.ConfigRepository; | ||
import jakarta.inject.Singleton; | ||
import java.io.IOException; | ||
import java.util.UUID; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
/** | ||
* Decides which Task Queue should be used for a given connection's sync operations, based on the | ||
* configured {@link Geography} | ||
*/ | ||
@Singleton | ||
@Slf4j | ||
public class RouterService { | ||
|
||
private final ConfigRepository configRepository; | ||
private final TaskQueueMapper taskQueueMapper; | ||
|
||
public RouterService(final ConfigRepository configRepository, final TaskQueueMapper taskQueueMapper) { | ||
this.configRepository = configRepository; | ||
this.taskQueueMapper = taskQueueMapper; | ||
} | ||
|
||
/** | ||
* Given a connectionId, look up the connection's configured {@link Geography} in the config DB and | ||
* use it to determine which Task Queue should be used for this connection's sync. | ||
*/ | ||
public String getTaskQueue(final UUID connectionId) throws IOException { | ||
final Geography geography = configRepository.getGeographyForConnection(connectionId); | ||
return taskQueueMapper.getTaskQueue(geography); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
48 changes: 0 additions & 48 deletions
48
airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RouterService.java
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
41 changes: 41 additions & 0 deletions
41
...kers/src/test/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapperTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.temporal.scheduling; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
||
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; | ||
import io.airbyte.config.Geography; | ||
import java.util.Arrays; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
import org.junit.jupiter.api.Test; | ||
|
||
class DefaultTaskQueueMapperTest { | ||
|
||
@Test | ||
void testGetTaskQueue() { | ||
// By default, every Geography should map to the default SYNC task queue | ||
final TaskQueueMapper mapper = new DefaultTaskQueueMapper(); | ||
|
||
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO)); | ||
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.US)); | ||
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.EU)); | ||
} | ||
|
||
/** | ||
* If this test fails, it likely means that a new value was added to the {@link Geography} enum. A | ||
* new entry must be added to {@link DefaultTaskQueueMapper#GEOGRAPHY_TASK_QUEUE_MAP} to get this | ||
* test to pass. | ||
*/ | ||
@Test | ||
void testAllGeographiesHaveAMapping() { | ||
final Set<Geography> allGeographies = Arrays.stream(Geography.values()).collect(Collectors.toSet()); | ||
final Set<Geography> mappedGeographies = DefaultTaskQueueMapper.GEOGRAPHY_TASK_QUEUE_MAP.keySet(); | ||
|
||
assertEquals(allGeographies, mappedGeographies); | ||
} | ||
|
||
} |
60 changes: 60 additions & 0 deletions
60
airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/RouterServiceTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.workers.temporal.scheduling; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
||
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; | ||
import io.airbyte.config.Geography; | ||
import io.airbyte.config.persistence.ConfigRepository; | ||
import java.io.IOException; | ||
import java.util.UUID; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.junit.jupiter.api.extension.ExtendWith; | ||
import org.mockito.Mock; | ||
import org.mockito.Mockito; | ||
import org.mockito.junit.jupiter.MockitoExtension; | ||
|
||
/** | ||
* Test suite for the {@link RouterService} class. | ||
*/ | ||
@ExtendWith(MockitoExtension.class) | ||
class RouterServiceTest { | ||
|
||
private static final UUID CONNECTION_ID = UUID.randomUUID(); | ||
private static final String US_TASK_QUEUE = "US_TASK_QUEUE"; | ||
private static final String EU_TASK_QUEUE = "EU_TASK_QUEUE"; | ||
|
||
@Mock | ||
private ConfigRepository mConfigRepository; | ||
|
||
@Mock | ||
private TaskQueueMapper mTaskQueueMapper; | ||
|
||
private RouterService routerService; | ||
|
||
@BeforeEach | ||
void init() { | ||
routerService = new RouterService(mConfigRepository, mTaskQueueMapper); | ||
|
||
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.AUTO)).thenReturn(US_TASK_QUEUE); | ||
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.US)).thenReturn(US_TASK_QUEUE); | ||
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.EU)).thenReturn(EU_TASK_QUEUE); | ||
} | ||
|
||
@Test | ||
void testGetTaskQueue() throws IOException { | ||
Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.AUTO); | ||
assertEquals(US_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID)); | ||
|
||
Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.US); | ||
assertEquals(US_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID)); | ||
|
||
Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.EU); | ||
assertEquals(EU_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID)); | ||
} | ||
|
||
} |
Oops, something went wrong.