diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/GeographyMapper.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/TaskQueueMapper.java similarity index 90% rename from airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/GeographyMapper.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/TaskQueueMapper.java index 1a004e116aab..c7003ef23b1b 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/GeographyMapper.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/TaskQueueMapper.java @@ -10,7 +10,7 @@ * Maps a {@link Geography} to a Temporal Task Queue that should be used to run syncs for the given * Geography. */ -public interface GeographyMapper { +public interface TaskQueueMapper { String getTaskQueue(Geography geography); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/DefaultGeographyMapper.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapper.java similarity index 84% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/DefaultGeographyMapper.java rename to airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapper.java index 22342c962f8c..fadd2b1e4b0b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/DefaultGeographyMapper.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapper.java @@ -6,19 +6,19 @@ import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.temporal.TemporalJobType; -import io.airbyte.commons.temporal.scheduling.GeographyMapper; +import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; import io.airbyte.config.Geography; import jakarta.inject.Singleton; import java.util.Map; @Singleton -public class DefaultGeographyMapper implements GeographyMapper { +public class DefaultTaskQueueMapper implements TaskQueueMapper { @VisibleForTesting static final String DEFAULT_SYNC_TASK_QUEUE = TemporalJobType.SYNC.name(); // By default, map every Geography value to the default task queue. - // To override this behavior, define a new GeographyMapper bean with the @Primary annotation. + // To override this behavior, define a new TaskQueueMapper bean with the @Primary annotation. @VisibleForTesting static final Map GEOGRAPHY_TASK_QUEUE_MAP = Map.of( Geography.AUTO, DEFAULT_SYNC_TASK_QUEUE, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/RouterService.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/RouterService.java index df3321e126f4..14c911aecb9f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/RouterService.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/RouterService.java @@ -4,7 +4,7 @@ package io.airbyte.workers.temporal.scheduling; -import io.airbyte.commons.temporal.scheduling.GeographyMapper; +import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; import io.airbyte.config.Geography; import io.airbyte.config.persistence.ConfigRepository; import jakarta.inject.Singleton; @@ -21,11 +21,11 @@ public class RouterService { private final ConfigRepository configRepository; - private final GeographyMapper geographyMapper; + private final TaskQueueMapper taskQueueMapper; - public RouterService(final ConfigRepository configRepository, final GeographyMapper geographyMapper) { + public RouterService(final ConfigRepository configRepository, final TaskQueueMapper taskQueueMapper) { this.configRepository = configRepository; - this.geographyMapper = geographyMapper; + this.taskQueueMapper = taskQueueMapper; } /** @@ -34,7 +34,7 @@ public RouterService(final ConfigRepository configRepository, final GeographyMap */ public String getTaskQueue(final UUID connectionId) throws IOException { final Geography geography = configRepository.getGeographyForConnection(connectionId); - return geographyMapper.getTaskQueue(geography); + return taskQueueMapper.getTaskQueue(geography); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java index a2e902b017fb..95dd69e976b6 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RouteToSyncTaskQueueActivityImpl.java @@ -14,7 +14,9 @@ import jakarta.inject.Singleton; import java.io.IOException; import java.util.Map; +import lombok.extern.slf4j.Slf4j; +@Slf4j @Singleton public class RouteToSyncTaskQueueActivityImpl implements RouteToSyncTaskQueueActivity { @@ -34,6 +36,7 @@ public RouteToSyncTaskQueueOutput route(final RouteToSyncTaskQueueInput input) { return new RouteToSyncTaskQueueOutput(taskQueueForConnectionId); } catch (final IOException e) { + log.warn("Encountered an error while attempting to route connection {} to a task queue: \n{}", input.getConnectionId(), e); throw new RetryableException(e); } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/DefaultGeographyMapperTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapperTest.java similarity index 67% rename from airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/DefaultGeographyMapperTest.java rename to airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapperTest.java index 05d1ad6c1124..81560faaca6e 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/DefaultGeographyMapperTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/DefaultTaskQueueMapperTest.java @@ -6,34 +6,34 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import io.airbyte.commons.temporal.scheduling.GeographyMapper; +import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; import io.airbyte.config.Geography; import java.util.Arrays; import java.util.Set; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; -class DefaultGeographyMapperTest { +class DefaultTaskQueueMapperTest { @Test void testGetTaskQueue() { // By default, every Geography should map to the default SYNC task queue - final GeographyMapper mapper = new DefaultGeographyMapper(); + final TaskQueueMapper mapper = new DefaultTaskQueueMapper(); - assertEquals(DefaultGeographyMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO)); - assertEquals(DefaultGeographyMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.US)); - assertEquals(DefaultGeographyMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.EU)); + assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.AUTO)); + assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.US)); + assertEquals(DefaultTaskQueueMapper.DEFAULT_SYNC_TASK_QUEUE, mapper.getTaskQueue(Geography.EU)); } /** * If this test fails, it likely means that a new value was added to the {@link Geography} enum. A - * new entry must be added to {@link DefaultGeographyMapper#GEOGRAPHY_TASK_QUEUE_MAP} to get this + * new entry must be added to {@link DefaultTaskQueueMapper#GEOGRAPHY_TASK_QUEUE_MAP} to get this * test to pass. */ @Test void testAllGeographiesHaveAMapping() { final Set allGeographies = Arrays.stream(Geography.values()).collect(Collectors.toSet()); - final Set mappedGeographies = DefaultGeographyMapper.GEOGRAPHY_TASK_QUEUE_MAP.keySet(); + final Set mappedGeographies = DefaultTaskQueueMapper.GEOGRAPHY_TASK_QUEUE_MAP.keySet(); assertEquals(allGeographies, mappedGeographies); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/RouterServiceTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/RouterServiceTest.java index 04e7833ef4e8..bf5a0c718f4f 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/RouterServiceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/RouterServiceTest.java @@ -6,7 +6,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import io.airbyte.commons.temporal.scheduling.GeographyMapper; +import io.airbyte.commons.temporal.scheduling.TaskQueueMapper; import io.airbyte.config.Geography; import io.airbyte.config.persistence.ConfigRepository; import java.io.IOException; @@ -32,17 +32,17 @@ class RouterServiceTest { private ConfigRepository mConfigRepository; @Mock - private GeographyMapper mGeographyMapper; + private TaskQueueMapper mTaskQueueMapper; private RouterService routerService; @BeforeEach void init() { - routerService = new RouterService(mConfigRepository, mGeographyMapper); + routerService = new RouterService(mConfigRepository, mTaskQueueMapper); - Mockito.when(mGeographyMapper.getTaskQueue(Geography.AUTO)).thenReturn(US_TASK_QUEUE); - Mockito.when(mGeographyMapper.getTaskQueue(Geography.US)).thenReturn(US_TASK_QUEUE); - Mockito.when(mGeographyMapper.getTaskQueue(Geography.EU)).thenReturn(EU_TASK_QUEUE); + Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.AUTO)).thenReturn(US_TASK_QUEUE); + Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.US)).thenReturn(US_TASK_QUEUE); + Mockito.when(mTaskQueueMapper.getTaskQueue(Geography.EU)).thenReturn(EU_TASK_QUEUE); } @Test