Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove workspace helper from fetchConfigActivity #21048

Merged
merged 5 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.invoker.generated.ApiClient;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.micronaut.context.BeanProvider;
Expand Down Expand Up @@ -73,6 +74,11 @@ public ConnectionApi connectionApi(final ApiClient apiClient) {
return new ConnectionApi(apiClient);
}

@Singleton
public WorkspaceApi workspaceApi(final ApiClient apiClient) {
return new WorkspaceApi(apiClient);
}

@Singleton
public HttpClient httpClient() {
return HttpClient.newHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
import io.airbyte.config.Notification;
import io.airbyte.config.Notification.NotificationType;
import io.airbyte.config.SlackNotificationConfiguration;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.notification.SlackNotificationClient;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.annotations.TemporalActivityStub;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity;
import io.airbyte.workers.temporal.scheduling.activities.NotifySchemaChangeActivity;
import io.airbyte.workers.temporal.scheduling.activities.SlackConfigActivity;
import io.temporal.workflow.Workflow;
import java.io.IOException;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -25,6 +25,9 @@
@Slf4j
public class ConnectionNotificationWorkflowImpl implements ConnectionNotificationWorkflow {

private static final String GET_BREAKING_CHANGE_TAG = "get_breaking_change";
private static final int GET_BREAKING_CHANGE_VERSION = 1;

@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
private NotifySchemaChangeActivity notifySchemaChangeActivity;
@TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions")
Expand All @@ -35,14 +38,20 @@ public class ConnectionNotificationWorkflowImpl implements ConnectionNotificatio
@Override
public boolean sendSchemaChangeNotification(final UUID connectionId)
throws IOException, InterruptedException, ApiException, ConfigNotFoundException, JsonValidationException {
final StandardSync standardSync = configFetchActivity.getStandardSync(connectionId);
final Optional<SlackNotificationConfiguration> slackConfig = slackConfigActivity.fetchSlackConfiguration(connectionId);
if (slackConfig.isPresent()) {
final Notification notification =
new Notification().withNotificationType(NotificationType.SLACK).withSendOnFailure(false).withSendOnSuccess(false)
.withSlackConfiguration(slackConfig.get());
final SlackNotificationClient notificationClient = new SlackNotificationClient(notification);
return notifySchemaChangeActivity.notifySchemaChange(notificationClient, connectionId, standardSync.getBreakingChange());
final int getBreakingChangeVersion =
Workflow.getVersion(GET_BREAKING_CHANGE_TAG, Workflow.DEFAULT_VERSION, GET_BREAKING_CHANGE_VERSION);
if (getBreakingChangeVersion >= GET_BREAKING_CHANGE_VERSION) {
final Optional<Boolean> breakingChange = configFetchActivity.getBreakingChange(connectionId);
final Optional<SlackNotificationConfiguration> slackConfig = slackConfigActivity.fetchSlackConfiguration(connectionId);
if (slackConfig.isPresent() && breakingChange.isPresent()) {
final Notification notification =
new Notification().withNotificationType(NotificationType.SLACK).withSendOnFailure(false).withSendOnSuccess(false)
.withSlackConfiguration(slackConfig.get());
final SlackNotificationClient notificationClient = new SlackNotificationClient(notification);
return notifySchemaChangeActivity.notifySchemaChange(notificationClient, connectionId, breakingChange.get());
} else {
return false;
}
} else {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@
package io.airbyte.workers.temporal.scheduling.activities;

import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -27,6 +23,9 @@ public interface ConfigFetchActivity {
@ActivityMethod
Optional<ConnectionStatus> getStatus(UUID connectionId);

@ActivityMethod
public Optional<Boolean> getBreakingChange(final UUID connectionId);

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand All @@ -45,8 +44,6 @@ class ScheduleRetrieverOutput {

}

StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException;

/**
* Return how much time to wait before running the next sync. It will query the DB to get the last
* starting time of the latest terminal job (Failed, canceled or successful) and return the amount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.annotations.VisibleForTesting;
import datadog.trace.api.Trace;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.ConnectionRead;
Expand All @@ -19,16 +20,12 @@
import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron;
import io.airbyte.api.client.model.generated.ConnectionScheduleType;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.validation.json.JsonValidationException;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Named;
Expand Down Expand Up @@ -67,43 +64,25 @@ public class ConfigFetchActivityImpl implements ConfigFetchActivity {
UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d"));
private static final long SCHEDULING_NOISE_CONSTANT = 15;

private final ConfigRepository configRepository;
private final JobPersistence jobPersistence;
private final WorkspaceHelper workspaceHelper;
private final WorkspaceApi workspaceApi;
private final Integer syncJobMaxAttempts;
private final Supplier<Long> currentSecondsSupplier;
private final ConnectionApi connectionApi;

public ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier,
connectionApi);
}

@VisibleForTesting
protected ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final WorkspaceHelper workspaceHelper,
protected ConfigFetchActivityImpl(final JobPersistence jobPersistence,
final WorkspaceApi workspaceApi,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.workspaceHelper = workspaceHelper;
this.workspaceApi = workspaceApi;
this.syncJobMaxAttempts = syncJobMaxAttempts;
this.currentSecondsSupplier = currentSecondsSupplier;
this.connectionApi = connectionApi;
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException {
return configRepository.getStandardSync(connectionId);
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public ScheduleRetrieverOutput getTimeToWait(final ScheduleRetrieverInput input) {
Expand Down Expand Up @@ -176,10 +155,12 @@ private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRe
}

private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait, ConnectionRead connectionRead) {
final UUID workspaceId;
UUID workspaceId;
try {
workspaceId = workspaceHelper.getWorkspaceForConnectionId(connectionRead.getConnectionId());
} catch (JsonValidationException | ConfigNotFoundException e) {
ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(connectionRead.getConnectionId());
final WorkspaceRead workspaceRead = workspaceApi.getWorkspaceByConnectionId(connectionIdRequestBody);
workspaceId = workspaceRead.getWorkspaceId();
} catch (ApiException e) {
// We tolerate exceptions and fail open by doing nothing.
return timeToWait;
}
Expand Down Expand Up @@ -264,6 +245,19 @@ public Optional<ConnectionStatus> getStatus(final UUID connectionId) {
}
}

@Override
public Optional<Boolean> getBreakingChange(final UUID connectionId) {
try {
final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody =
new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId);
final ConnectionRead connectionRead = connectionApi.getConnection(requestBody);
return Optional.ofNullable(connectionRead.getBreakingChange());
} catch (ApiException e) {
log.info("Encountered an error fetching the connection's breaking change status: ", e);
return Optional.empty();
}
}

private Long getIntervalInSecond(final ConnectionScheduleDataBasicSchedule schedule) {
return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.commons.temporal.scheduling.ConnectionNotificationWorkflow;
import io.airbyte.config.SlackNotificationConfiguration;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.notification.SlackNotificationClient;
import io.airbyte.validation.json.JsonValidationException;
Expand Down Expand Up @@ -102,7 +101,7 @@ void sendSchemaChangeNotificationNonBreakingChangeTest()

final UUID connectionId = UUID.randomUUID();

when(mConfigFetchActivity.getStandardSync(connectionId)).thenReturn(new StandardSync().withBreakingChange(false));
when(mConfigFetchActivity.getBreakingChange(connectionId)).thenReturn(Optional.of(false));
workflow.sendSchemaChangeNotification(connectionId);

verify(mNotifySchemaChangeActivity, times(1)).notifySchemaChange(any(SlackNotificationClient.class), any(UUID.class), any(boolean.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.mockito.Mockito.when;

import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.WorkspaceApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionRead;
import io.airbyte.api.client.model.generated.ConnectionSchedule;
Expand All @@ -17,10 +18,9 @@
import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron;
import io.airbyte.api.client.model.generated.ConnectionScheduleType;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.api.client.model.generated.WorkspaceRead;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverInput;
Expand All @@ -46,15 +46,11 @@ class ConfigFetchActivityTest {

private static final Integer SYNC_JOB_MAX_ATTEMPTS = 3;

@Mock
private ConfigRepository mConfigRepository;

@Mock
private JobPersistence mJobPersistence;

@Mock
private WorkspaceHelper mWorkspaceHelper;

private WorkspaceApi mWorkspaceApi;
@Mock
private Job mJob;

Expand Down Expand Up @@ -107,7 +103,7 @@ class ConfigFetchActivityTest {
@BeforeEach
void setup() {
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS,
() -> Instant.now().getEpochSecond(), mConnectionApi);
}

Expand Down Expand Up @@ -177,7 +173,7 @@ void testDeleted() throws ApiException {
@DisplayName("Test we will wait the required amount of time with legacy config")
void testWait() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException {
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);

when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand All @@ -200,7 +196,7 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep
@DisplayName("Test we will not wait if we are late in the legacy schedule schema")
void testNotWaitIfLate() throws IOException, ApiException {
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi);
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi);

when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand Down Expand Up @@ -255,7 +251,7 @@ void testBasicScheduleTypeFirstRun() throws IOException, ApiException {
@Test
@DisplayName("Test that we will wait the required amount of time with a BASIC_SCHEDULE type on a subsequent run")
void testBasicScheduleSubsequentRun() throws IOException, ApiException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);
configFetchActivity = new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);

when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand Down Expand Up @@ -283,10 +279,10 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException
mockRightNow.set(Calendar.SECOND, 0);
mockRightNow.set(Calendar.MILLISECOND, 0);

when(mWorkspaceHelper.getWorkspaceForConnectionId(any())).thenReturn(UUID.randomUUID());
when(mWorkspaceApi.getWorkspaceByConnectionId(any())).thenReturn(new WorkspaceRead().workspaceId(UUID.randomUUID()));

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);

when(mJobPersistence.getLastReplicationJob(connectionId))
Expand All @@ -312,10 +308,10 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti
mockRightNow.set(Calendar.SECOND, 0);
mockRightNow.set(Calendar.MILLISECOND, 0);

when(mWorkspaceHelper.getWorkspaceForConnectionId(any())).thenReturn(UUID.randomUUID());
when(mWorkspaceApi.getWorkspaceByConnectionId(any())).thenReturn(new WorkspaceRead().workspaceId(UUID.randomUUID()));

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);

when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
Expand All @@ -342,10 +338,11 @@ void testCronSchedulingNoise() throws IOException, JsonValidationException, Conf
mockRightNow.set(Calendar.SECOND, 0);
mockRightNow.set(Calendar.MILLISECOND, 0);

when(mWorkspaceHelper.getWorkspaceForConnectionId(any())).thenReturn(UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d"));
when(mWorkspaceApi.getWorkspaceByConnectionId(any()))
.thenReturn(new WorkspaceRead().workspaceId(UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d")));

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);

when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
Expand All @@ -371,7 +368,7 @@ class TestGetMaxAttempt {
void testGetMaxAttempt() {
final int maxAttempt = 15031990;
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi);
new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi);
Assertions.assertThat(configFetchActivity.getMaxAttempt().getMaxAttempt())
.isEqualTo(maxAttempt);
}
Expand Down