Skip to content

Commit

Permalink
Add Geography support to RouterService (#17902)
Browse files Browse the repository at this point in the history
* router service uses geography in database instead of env var

move geography map to a helper that can be overridden with a separate implementation in cloud

format

pmd

fix import

move geography mapper interface to airbyte-commons-temporal

* add DefaultGeographyMapper back in

* remove all args constructor and extranneous import

* rename GeographyMapper to TaskQueueMapper
  • Loading branch information
pmossman authored Oct 20, 2022
1 parent e232ffa commit 4e236b5
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 134 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal.scheduling;

import io.airbyte.config.Geography;

/**
* Maps a {@link Geography} to a Temporal Task Queue that should be used to run syncs for the given
* Geography.
*/
public interface TaskQueueMapper {

String getTaskQueue(Geography geography);

}
Original file line number Diff line number Diff line change
Expand Up @@ -579,16 +579,6 @@ public interface Configs {
*/
boolean shouldRunConnectionManagerWorkflows();

// Worker - Control Plane configs

/**
* TEMPORARY: Define a set of connection IDs that should run in Airbyte's MVP Data Plane. - This
* should only be set on Control-plane workers, since those workers decide which Data Plane task
* queue to use based on connectionId. - Will be removed in favor of the Routing Service in the
* future. Internal-use only.
*/
Set<String> connectionIdsForMvpDataPlane();

// Worker - Data Plane configs

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ public class EnvConfigs implements Configs {

// Worker - Control plane configs
private static final String DEFAULT_DATA_SYNC_TASK_QUEUES = "SYNC"; // should match TemporalJobType.SYNC.name()
private static final String CONNECTION_IDS_FOR_MVP_DATA_PLANE = "CONNECTION_IDS_FOR_MVP_DATA_PLANE";

// Worker - Data Plane configs
private static final String DATA_SYNC_TASK_QUEUES = "DATA_SYNC_TASK_QUEUES";
Expand Down Expand Up @@ -947,17 +946,6 @@ public boolean shouldRunConnectionManagerWorkflows() {
return getEnvOrDefault(SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS, true);
}

// Worker - Control plane

@Override
public Set<String> connectionIdsForMvpDataPlane() {
final var connectionIds = getEnvOrDefault(CONNECTION_IDS_FOR_MVP_DATA_PLANE, "");
if (connectionIds.isEmpty()) {
return new HashSet<>();
}
return Arrays.stream(connectionIds.split(",")).collect(Collectors.toSet());
}

// Worker - Data plane

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.Geography;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.StandardDestinationDefinition;
Expand Down Expand Up @@ -1122,4 +1123,12 @@ public ConfiguredAirbyteCatalog getConfiguredCatalogForConnection(final UUID con
return standardSync.getCatalog();
}

public Geography getGeographyForConnection(final UUID connectionId) throws IOException {
return database.query(ctx -> ctx.select(CONNECTION.GEOGRAPHY)
.from(CONNECTION)
.where(CONNECTION.ID.eq(connectionId))
.limit(1))
.fetchOneInto(Geography.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.Geography;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.StandardDestinationDefinition;
Expand Down Expand Up @@ -491,4 +492,13 @@ void testGetDestinationAndDefinitionsFromDestinationIds() throws IOException {
assertThat(actual).hasSameElementsAs(expected);
}

@Test
void testGetGeographyForConnection() throws IOException {
final StandardSync sync = MockData.standardSyncs().get(0);
final Geography expected = sync.getGeography();
final Geography actual = configRepository.getGeographyForConnection(sync.getConnectionId());

assertEquals(expected, actual);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
import io.airbyte.config.Geography;
import jakarta.inject.Singleton;
import java.util.Map;

@Singleton
public class DefaultTaskQueueMapper implements TaskQueueMapper {

@VisibleForTesting
static final String DEFAULT_SYNC_TASK_QUEUE = TemporalJobType.SYNC.name();

// By default, map every Geography value to the default task queue.
// To override this behavior, define a new TaskQueueMapper bean with the @Primary annotation.
@VisibleForTesting
static final Map<Geography, String> GEOGRAPHY_TASK_QUEUE_MAP = Map.of(
Geography.AUTO, DEFAULT_SYNC_TASK_QUEUE,
Geography.US, DEFAULT_SYNC_TASK_QUEUE,
Geography.EU, DEFAULT_SYNC_TASK_QUEUE);

@Override
public String getTaskQueue(final Geography geography) {
if (GEOGRAPHY_TASK_QUEUE_MAP.containsKey(geography)) {
return GEOGRAPHY_TASK_QUEUE_MAP.get(geography);
}

throw new IllegalArgumentException(String.format("Unexpected geography %s", geography));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;

import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
import io.airbyte.config.Geography;
import io.airbyte.config.persistence.ConfigRepository;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;

/**
* Decides which Task Queue should be used for a given connection's sync operations, based on the
* configured {@link Geography}
*/
@Singleton
@Slf4j
public class RouterService {

private final ConfigRepository configRepository;
private final TaskQueueMapper taskQueueMapper;

public RouterService(final ConfigRepository configRepository, final TaskQueueMapper taskQueueMapper) {
this.configRepository = configRepository;
this.taskQueueMapper = taskQueueMapper;
}

/**
* Given a connectionId, look up the connection's configured {@link Geography} in the config DB and
* use it to determine which Task Queue should be used for this connection's sync.
*/
public String getTaskQueue(final UUID connectionId) throws IOException {
final Geography geography = configRepository.getGeographyForConnection(connectionId);
return taskQueueMapper.getTaskQueue(geography);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
import static io.airbyte.workers.temporal.trace.TemporalTraceConstants.Tags.CONNECTION_ID_KEY;

import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.workers.temporal.sync.RouterService;
import io.airbyte.workers.temporal.scheduling.RouterService;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Singleton
public class RouteToSyncTaskQueueActivityImpl implements RouteToSyncTaskQueueActivity {

Expand All @@ -27,9 +31,14 @@ public RouteToSyncTaskQueueActivityImpl(final RouterService routerService) {
public RouteToSyncTaskQueueOutput route(final RouteToSyncTaskQueueInput input) {
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId()));

final String taskQueueForConnectionId = routerService.getTaskQueue(input.getConnectionId());
try {
final String taskQueueForConnectionId = routerService.getTaskQueue(input.getConnectionId());

return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId);
return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId);
} catch (final IOException e) {
log.warn("Encountered an error while attempting to route connection {} to a task queue: \n{}", input.getConnectionId(), e);
throw new RetryableException(e);
}
}

}

This file was deleted.

1 change: 0 additions & 1 deletion airbyte-workers/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ airbyte:
sync:
task-queue: ${DATA_SYNC_TASK_QUEUES:SYNC}
plane:
connection-ids-mvp: ${CONNECTION_IDS_FOR_MVP_DATA_PLANE:}
service-account:
credentials-path: ${DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH:}
email: ${DATA_PLANE_SERVICE_ACCOUNT_EMAIL:}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
import io.airbyte.config.Geography;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;

class DefaultTaskQueueMapperTest {

@Test
void testGetTaskQueue() {
// By default, every Geography should map to the default SYNC task queue
final TaskQueueMapper mapper = new DefaultTaskQueueMapper();

assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO));
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.US));
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.EU));
}

/**
* If this test fails, it likely means that a new value was added to the {@link Geography} enum. A
* new entry must be added to {@link DefaultTaskQueueMapper#GEOGRAPHY_TASK_QUEUE_MAP} to get this
* test to pass.
*/
@Test
void testAllGeographiesHaveAMapping() {
final Set<Geography> allGeographies = Arrays.stream(Geography.values()).collect(Collectors.toSet());
final Set<Geography> mappedGeographies = DefaultTaskQueueMapper.GEOGRAPHY_TASK_QUEUE_MAP.keySet();

assertEquals(allGeographies, mappedGeographies);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.scheduling;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
import io.airbyte.config.Geography;
import io.airbyte.config.persistence.ConfigRepository;
import java.io.IOException;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

/**
* Test suite for the {@link RouterService} class.
*/
@ExtendWith(MockitoExtension.class)
class RouterServiceTest {

private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final String US_TASK_QUEUE = "US_TASK_QUEUE";
private static final String EU_TASK_QUEUE = "EU_TASK_QUEUE";

@Mock
private ConfigRepository mConfigRepository;

@Mock
private TaskQueueMapper mTaskQueueMapper;

private RouterService routerService;

@BeforeEach
void init() {
routerService = new RouterService(mConfigRepository, mTaskQueueMapper);

Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.AUTO)).thenReturn(US_TASK_QUEUE);
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.US)).thenReturn(US_TASK_QUEUE);
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.EU)).thenReturn(EU_TASK_QUEUE);
}

@Test
void testGetTaskQueue() throws IOException {
Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.AUTO);
assertEquals(US_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID));

Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.US);
assertEquals(US_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID));

Mockito.when(mConfigRepository.getGeographyForConnection(CONNECTION_ID)).thenReturn(Geography.EU);
assertEquals(EU_TASK_QUEUE, routerService.getTaskQueue(CONNECTION_ID));
}

}
Loading

0 comments on commit 4e236b5

Please sign in to comment.