Skip to content

Commit

Permalink
rename GeographyMapper to TaskQueueMapper
Browse files Browse the repository at this point in the history
  • Loading branch information
pmossman committed Oct 20, 2022
1 parent 05518c9 commit 53045c1
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* Maps a {@link Geography} to a Temporal Task Queue that should be used to run syncs for the given
* Geography.
*/
public interface GeographyMapper {
public interface TaskQueueMapper {

String getTaskQueue(Geography geography);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.temporal.TemporalJobType;
import io.airbyte.commons.temporal.scheduling.GeographyMapper;
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
import io.airbyte.config.Geography;
import jakarta.inject.Singleton;
import java.util.Map;

@Singleton
public class DefaultGeographyMapper implements GeographyMapper {
public class DefaultTaskQueueMapper implements TaskQueueMapper {

@VisibleForTesting
static final String DEFAULT_SYNC_TASK_QUEUE = TemporalJobType.SYNC.name();

// By default, map every Geography value to the default task queue.
// To override this behavior, define a new GeographyMapper bean with the @Primary annotation.
// To override this behavior, define a new TaskQueueMapper bean with the @Primary annotation.
@VisibleForTesting
static final Map<Geography, String> GEOGRAPHY_TASK_QUEUE_MAP = Map.of(
Geography.AUTO, DEFAULT_SYNC_TASK_QUEUE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.workers.temporal.scheduling;

import io.airbyte.commons.temporal.scheduling.GeographyMapper;
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
import io.airbyte.config.Geography;
import io.airbyte.config.persistence.ConfigRepository;
import jakarta.inject.Singleton;
Expand All @@ -21,11 +21,11 @@
public class RouterService {

private final ConfigRepository configRepository;
private final GeographyMapper geographyMapper;
private final TaskQueueMapper taskQueueMapper;

public RouterService(final ConfigRepository configRepository, final GeographyMapper geographyMapper) {
public RouterService(final ConfigRepository configRepository, final TaskQueueMapper taskQueueMapper) {
this.configRepository = configRepository;
this.geographyMapper = geographyMapper;
this.taskQueueMapper = taskQueueMapper;
}

/**
Expand All @@ -34,7 +34,7 @@ public RouterService(final ConfigRepository configRepository, final GeographyMap
*/
public String getTaskQueue(final UUID connectionId) throws IOException {
final Geography geography = configRepository.getGeographyForConnection(connectionId);
return geographyMapper.getTaskQueue(geography);
return taskQueueMapper.getTaskQueue(geography);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Singleton
public class RouteToSyncTaskQueueActivityImpl implements RouteToSyncTaskQueueActivity {

Expand All @@ -34,6 +36,7 @@ public RouteToSyncTaskQueueOutput route(final RouteToSyncTaskQueueInput input) {

return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId);
} catch (final IOException e) {
log.warn("Encountered an error while attempting to route connection {} to a task queue: \n{}", input.getConnectionId(), e);
throw new RetryableException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,34 @@

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.commons.temporal.scheduling.GeographyMapper;
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
import io.airbyte.config.Geography;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;

class DefaultGeographyMapperTest {
class DefaultTaskQueueMapperTest {

@Test
void testGetTaskQueue() {
// By default, every Geography should map to the default SYNC task queue
final GeographyMapper mapper = new DefaultGeographyMapper();
final TaskQueueMapper mapper = new DefaultTaskQueueMapper();

assertEquals(DefaultGeographyMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO));
assertEquals(DefaultGeographyMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.US));
assertEquals(DefaultGeographyMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.EU));
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO));
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.US));
assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.EU));
}

/**
* If this test fails, it likely means that a new value was added to the {@link Geography} enum. A
* new entry must be added to {@link DefaultGeographyMapper#GEOGRAPHY_TASK_QUEUE_MAP} to get this
* new entry must be added to {@link DefaultTaskQueueMapper#GEOGRAPHY_TASK_QUEUE_MAP} to get this
* test to pass.
*/
@Test
void testAllGeographiesHaveAMapping() {
final Set<Geography> allGeographies = Arrays.stream(Geography.values()).collect(Collectors.toSet());
final Set<Geography> mappedGeographies = DefaultGeographyMapper.GEOGRAPHY_TASK_QUEUE_MAP.keySet();
final Set<Geography> mappedGeographies = DefaultTaskQueueMapper.GEOGRAPHY_TASK_QUEUE_MAP.keySet();

assertEquals(allGeographies, mappedGeographies);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.commons.temporal.scheduling.GeographyMapper;
import io.airbyte.commons.temporal.scheduling.TaskQueueMapper;
import io.airbyte.config.Geography;
import io.airbyte.config.persistence.ConfigRepository;
import java.io.IOException;
Expand All @@ -32,17 +32,17 @@ class RouterServiceTest {
private ConfigRepository mConfigRepository;

@Mock
private GeographyMapper mGeographyMapper;
private TaskQueueMapper mTaskQueueMapper;

private RouterService routerService;

@BeforeEach
void init() {
routerService = new RouterService(mConfigRepository, mGeographyMapper);
routerService = new RouterService(mConfigRepository, mTaskQueueMapper);

Mockito.when(mGeographyMapper.getTaskQueue(Geography.AUTO)).thenReturn(US_TASK_QUEUE);
Mockito.when(mGeographyMapper.getTaskQueue(Geography.US)).thenReturn(US_TASK_QUEUE);
Mockito.when(mGeographyMapper.getTaskQueue(Geography.EU)).thenReturn(EU_TASK_QUEUE);
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.AUTO)).thenReturn(US_TASK_QUEUE);
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.US)).thenReturn(US_TASK_QUEUE);
Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.EU)).thenReturn(EU_TASK_QUEUE);
}

@Test
Expand Down

0 comments on commit 53045c1

Please sign in to comment.