Skip to content

Commit

Permalink
fix: refresh actor configuration and state between sync attempts (#21629
Browse files Browse the repository at this point in the history
)

* add AttemptSyncConfig, move info out of JobSyncConfig

* get build working

* add db migration

* load config when building attempts

* persist AttemptSyncConfig

* it compiles

* fix job persistence test

* implement submitSync with attempt config

* fix TemporalClientTest

* reorganizing some code

* add GenerateInputActivity test

* verify AttemptSyncConfig is persisted

* add test for persistence changes

* add test for getAttemptByNumber

* use apis rather than direct db access

* fix compatibility with master

* copy update

* fix tests for allowed hosts addition

* remove debug logging

* fix: handle when state is not set on the connection

* fix: handle unset state (on the server this time)

* set state type when converting to internal representation
  • Loading branch information
pedroslopez authored Feb 1, 2023
1 parent 219b6e1 commit d22dfe8
Show file tree
Hide file tree
Showing 38 changed files with 902 additions and 142 deletions.
45 changes: 45 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2214,6 +2214,26 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/InternalOperationResult"
/v1/attempt/save_sync_config:
post:
tags:
- attempt
- internal
summary: For worker to save the AttemptSyncConfig for an attempt.
operationId: saveSyncConfig
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SaveAttemptSyncConfigRequestBody"
required: true
responses:
"200":
description: Successful Operation
content:
application/json:
schema:
$ref: "#/components/schemas/InternalOperationResult"

components:
securitySchemes:
Expand Down Expand Up @@ -5025,6 +5045,31 @@ components:
type: array
items:
$ref: "#/components/schemas/AttemptStreamStats"
AttemptSyncConfig:
type: object
required:
- sourceConfiguration
- destinationConfiguration
properties:
sourceConfiguration:
$ref: "#/components/schemas/SourceConfiguration"
destinationConfiguration:
$ref: "#/components/schemas/DestinationConfiguration"
state:
$ref: "#/components/schemas/ConnectionState"
SaveAttemptSyncConfigRequestBody:
type: object
required:
- jobId
- attemptNumber
- syncConfig
properties:
jobId:
$ref: "#/components/schemas/JobId"
attemptNumber:
$ref: "#/components/schemas/AttemptNumber"
syncConfig:
$ref: "#/components/schemas/AttemptSyncConfig"
InternalOperationResult:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class BootloaderTest {

// ⚠️ This line should change with every new migration to show that you meant to make a new
// migration to the prod database
private static final String CURRENT_MIGRATION_VERSION = "0.40.28.001";
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.40.28.001";
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.40.28.001";

@BeforeEach
void setup() {
Expand Down Expand Up @@ -147,10 +148,10 @@ void testBootloaderAppBlankDb() throws Exception {
bootloader.load();

val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
assertEquals("0.40.26.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals(CURRENT_JOBS_MIGRATION_VERSION, jobsMigrator.getLatestMigration().getVersion().getVersion());

val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
assertEquals(CURRENT_MIGRATION_VERSION, configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals(CURRENT_CONFIGS_MIGRATION_VERSION, configsMigrator.getLatestMigration().getVersion().getVersion());

assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get());
assertEquals(new Version(PROTOCOL_VERSION_123), jobsPersistence.getAirbyteProtocolVersionMin().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
package io.airbyte.commons.server.converters;

import io.airbyte.api.model.generated.ActorDefinitionResourceRequirements;
import io.airbyte.api.model.generated.AttemptSyncConfig;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.ConnectionSchedule;
import io.airbyte.api.model.generated.ConnectionScheduleData;
import io.airbyte.api.model.generated.ConnectionScheduleDataBasicSchedule;
import io.airbyte.api.model.generated.ConnectionScheduleDataCron;
import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateType;
import io.airbyte.api.model.generated.ConnectionStatus;
import io.airbyte.api.model.generated.Geography;
import io.airbyte.api.model.generated.JobType;
Expand All @@ -22,6 +25,12 @@
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.Schedule;
import io.airbyte.config.StandardSync;
import io.airbyte.config.State;
import io.airbyte.config.StateWrapper;
import io.airbyte.config.helpers.StateMessageHelper;
import io.airbyte.workers.helper.StateConverter;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

public class ApiPojoConverters {
Expand All @@ -42,6 +51,42 @@ public static io.airbyte.config.ActorDefinitionResourceRequirements actorDefReso
.collect(Collectors.toList()));
}

public static io.airbyte.config.AttemptSyncConfig attemptSyncConfigToInternal(final AttemptSyncConfig attemptSyncConfig) {
if (attemptSyncConfig == null) {
return null;
}

final io.airbyte.config.AttemptSyncConfig internalAttemptSyncConfig = new io.airbyte.config.AttemptSyncConfig()
.withSourceConfiguration(attemptSyncConfig.getSourceConfiguration())
.withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration());

final ConnectionState connectionState = attemptSyncConfig.getState();
if (connectionState != null && connectionState.getStateType() != ConnectionStateType.NOT_SET) {
final StateWrapper stateWrapper = StateConverter.toInternal(attemptSyncConfig.getState());
final io.airbyte.config.State state = StateMessageHelper.getState(stateWrapper);
internalAttemptSyncConfig.setState(state);
}

return internalAttemptSyncConfig;
}

public static io.airbyte.api.client.model.generated.AttemptSyncConfig attemptSyncConfigToClient(final io.airbyte.config.AttemptSyncConfig attemptSyncConfig,
final UUID connectionId,
final boolean useStreamCapableState) {
if (attemptSyncConfig == null) {
return null;
}

final State state = attemptSyncConfig.getState();
final Optional<StateWrapper> optStateWrapper = state != null ? StateMessageHelper.getTypedState(
state.getState(), useStreamCapableState) : Optional.empty();

return new io.airbyte.api.client.model.generated.AttemptSyncConfig()
.sourceConfiguration(attemptSyncConfig.getSourceConfiguration())
.destinationConfiguration(attemptSyncConfig.getDestinationConfiguration())
.state(StateConverter.toClient(connectionId, optStateWrapper.orElse(null)));
}

public static ActorDefinitionResourceRequirements actorDefResourceReqsToApi(final io.airbyte.config.ActorDefinitionResourceRequirements actorDefResourceReqs) {
if (actorDefResourceReqs == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package io.airbyte.commons.server.handlers;

import io.airbyte.api.model.generated.InternalOperationResult;
import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody;
import io.airbyte.api.model.generated.SaveStatsRequestBody;
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
import io.airbyte.commons.server.converters.ApiPojoConverters;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.persistence.job.JobPersistence;
Expand Down Expand Up @@ -63,4 +65,17 @@ public InternalOperationResult saveStats(final SaveStatsRequestBody requestBody)
return new InternalOperationResult().succeeded(true);
}

public InternalOperationResult saveSyncConfig(final SaveAttemptSyncConfigRequestBody requestBody) {
try {
jobPersistence.writeAttemptSyncConfig(
requestBody.getJobId(),
requestBody.getAttemptNumber(),
ApiPojoConverters.attemptSyncConfigToInternal(requestBody.getSyncConfig()));
} catch (final IOException ioe) {
LOGGER.error("IOException when saving AttemptSyncConfig for attempt;", ioe);
return new InternalOperationResult().succeeded(false);
}
return new InternalOperationResult().succeeded(true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doThrow;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.api.model.generated.AttemptSyncConfig;
import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateType;
import io.airbyte.api.model.generated.GlobalState;
import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody;
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.server.converters.ApiPojoConverters;
import io.airbyte.persistence.job.JobPersistence;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -26,6 +35,7 @@ class AttemptHandlerTest {
JobPersistence jobPersistence;
AttemptHandler handler;

private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final long JOB_ID = 10002L;
private static final int ATTEMPT_NUMBER = 1;

Expand All @@ -39,14 +49,14 @@ public void init() {

@Test
void testInternalWorkerHandlerSetsTemporalWorkflowId() throws Exception {
String workflowId = UUID.randomUUID().toString();
final String workflowId = UUID.randomUUID().toString();

final ArgumentCaptor<Integer> attemptNumberCapture = ArgumentCaptor.forClass(Integer.class);
final ArgumentCaptor<Long> jobIdCapture = ArgumentCaptor.forClass(Long.class);
final ArgumentCaptor<String> workflowIdCapture = ArgumentCaptor.forClass(String.class);
final ArgumentCaptor<String> queueCapture = ArgumentCaptor.forClass(String.class);

SetWorkflowInAttemptRequestBody requestBody =
final SetWorkflowInAttemptRequestBody requestBody =
new SetWorkflowInAttemptRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).workflowId(workflowId)
.processingTaskQueue(PROCESSING_TASK_QUEUE);

Expand All @@ -63,7 +73,7 @@ void testInternalWorkerHandlerSetsTemporalWorkflowId() throws Exception {

@Test
void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception {
String workflowId = UUID.randomUUID().toString();
final String workflowId = UUID.randomUUID().toString();

doThrow(IOException.class).when(jobPersistence).setAttemptTemporalWorkflowInfo(anyLong(), anyInt(),
any(), any());
Expand All @@ -73,7 +83,7 @@ void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception {
final ArgumentCaptor<String> workflowIdCapture = ArgumentCaptor.forClass(String.class);
final ArgumentCaptor<String> queueCapture = ArgumentCaptor.forClass(String.class);

SetWorkflowInAttemptRequestBody requestBody =
final SetWorkflowInAttemptRequestBody requestBody =
new SetWorkflowInAttemptRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).workflowId(workflowId)
.processingTaskQueue(PROCESSING_TASK_QUEUE);

Expand All @@ -88,4 +98,38 @@ void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception {
assertEquals(PROCESSING_TASK_QUEUE, queueCapture.getValue());
}

@Test
void testInternalHandlerSetsAttemptSyncConfig() throws Exception {
final ArgumentCaptor<Integer> attemptNumberCapture = ArgumentCaptor.forClass(Integer.class);
final ArgumentCaptor<Long> jobIdCapture = ArgumentCaptor.forClass(Long.class);
final ArgumentCaptor<io.airbyte.config.AttemptSyncConfig> attemptSyncConfigCapture =
ArgumentCaptor.forClass(io.airbyte.config.AttemptSyncConfig.class);

final JsonNode sourceConfig = Jsons.jsonNode(Map.of("source_key", "source_val"));
final JsonNode destinationConfig = Jsons.jsonNode(Map.of("destination_key", "destination_val"));
final ConnectionState state = new ConnectionState()
.connectionId(CONNECTION_ID)
.stateType(ConnectionStateType.GLOBAL)
.streamState(null)
.globalState(new GlobalState().sharedState(Jsons.jsonNode(Map.of("state_key", "state_val"))));

final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig()
.destinationConfiguration(destinationConfig)
.sourceConfiguration(sourceConfig)
.state(state);

final SaveAttemptSyncConfigRequestBody requestBody =
new SaveAttemptSyncConfigRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).syncConfig(attemptSyncConfig);

assertTrue(handler.saveSyncConfig(requestBody).getSucceeded());

Mockito.verify(jobPersistence).writeAttemptSyncConfig(jobIdCapture.capture(), attemptNumberCapture.capture(), attemptSyncConfigCapture.capture());

final io.airbyte.config.AttemptSyncConfig expectedAttemptSyncConfig = ApiPojoConverters.attemptSyncConfigToInternal(attemptSyncConfig);

assertEquals(ATTEMPT_NUMBER, attemptNumberCapture.getValue());
assertEquals(JOB_ID, jobIdCapture.getValue());
assertEquals(expectedAttemptSyncConfig, attemptSyncConfigCapture.getValue());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private static AttemptRead toAttemptRead(final Attempt a) {
}

private static Attempt createAttempt(final long jobId, final long timestamps, final AttemptStatus status) {
return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, status, null, null, timestamps, timestamps, timestamps);
return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, null, status, null, null, timestamps, timestamps, timestamps);
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.commons.temporal.scheduling.SpecWorkflow;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.airbyte.config.AttemptSyncConfig;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.JobCheckConnectionConfig;
import io.airbyte.config.JobDiscoverCatalogConfig;
Expand Down Expand Up @@ -372,7 +373,11 @@ public TemporalResponse<ConnectorJobOutput> submitDiscoverSchema(final UUID jobI
() -> getWorkflowStub(DiscoverCatalogWorkflow.class, TemporalJobType.DISCOVER_SCHEMA).run(jobRunConfig, launcherConfig, input));
}

public TemporalResponse<StandardSyncOutput> submitSync(final long jobId, final int attempt, final JobSyncConfig config, final UUID connectionId) {
public TemporalResponse<StandardSyncOutput> submitSync(final long jobId,
final int attempt,
final JobSyncConfig config,
final AttemptSyncConfig attemptConfig,
final UUID connectionId) {
final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt);

final IntegrationLauncherConfig sourceLauncherConfig = new IntegrationLauncherConfig()
Expand All @@ -393,11 +398,11 @@ public TemporalResponse<StandardSyncOutput> submitSync(final long jobId, final i
.withNamespaceDefinition(config.getNamespaceDefinition())
.withNamespaceFormat(config.getNamespaceFormat())
.withPrefix(config.getPrefix())
.withSourceConfiguration(config.getSourceConfiguration())
.withDestinationConfiguration(config.getDestinationConfiguration())
.withSourceConfiguration(attemptConfig.getSourceConfiguration())
.withDestinationConfiguration(attemptConfig.getDestinationConfiguration())
.withOperationSequence(config.getOperationSequence())
.withCatalog(config.getConfiguredAirbyteCatalog())
.withState(config.getState())
.withState(attemptConfig.getState())
.withResourceRequirements(config.getResourceRequirements())
.withSourceResourceRequirements(config.getSourceResourceRequirements())
.withDestinationResourceRequirements(config.getDestinationResourceRequirements())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.airbyte.commons.temporal.scheduling.SpecWorkflow;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.airbyte.config.AttemptSyncConfig;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobCheckConnectionConfig;
Expand Down Expand Up @@ -271,26 +272,27 @@ void testSubmitSync() {
final JobSyncConfig syncConfig = new JobSyncConfig()
.withSourceDockerImage(IMAGE_NAME1)
.withDestinationDockerImage(IMAGE_NAME2)
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(Jsons.emptyObject())
.withOperationSequence(List.of())
.withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog());
final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig()
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(Jsons.emptyObject());
final StandardSyncInput input = new StandardSyncInput()
.withNamespaceDefinition(syncConfig.getNamespaceDefinition())
.withNamespaceFormat(syncConfig.getNamespaceFormat())
.withPrefix(syncConfig.getPrefix())
.withSourceConfiguration(syncConfig.getSourceConfiguration())
.withDestinationConfiguration(syncConfig.getDestinationConfiguration())
.withSourceConfiguration(attemptSyncConfig.getSourceConfiguration())
.withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration())
.withOperationSequence(syncConfig.getOperationSequence())
.withCatalog(syncConfig.getConfiguredAirbyteCatalog())
.withState(syncConfig.getState());
.withState(attemptSyncConfig.getState());

final IntegrationLauncherConfig destinationLauncherConfig = new IntegrationLauncherConfig()
.withJobId(String.valueOf(JOB_ID))
.withAttemptId((long) ATTEMPT_ID)
.withDockerImage(IMAGE_NAME2);

temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID);
temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID);
discoverCatalogWorkflow.run(JOB_RUN_CONFIG, LAUNCHER_CONFIG, destinationLauncherConfig, input, CONNECTION_ID);
verify(workflowClient).newWorkflowStub(SyncWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.SYNC));
}
Expand Down Expand Up @@ -340,15 +342,17 @@ void testforceCancelConnection() {
doReturn(true).when(temporalClient).isWorkflowReachable(any(UUID.class));
when(workflowClient.newWorkflowStub(any(Class.class), anyString())).thenReturn(mConnectionManagerWorkflow);

final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig()
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(Jsons.emptyObject());

final JobSyncConfig syncConfig = new JobSyncConfig()
.withSourceDockerImage(IMAGE_NAME1)
.withDestinationDockerImage(IMAGE_NAME2)
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(Jsons.emptyObject())
.withOperationSequence(List.of())
.withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog());

temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID);
temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID);
temporalClient.forceDeleteWorkflow(CONNECTION_ID);

verify(connectionManagerUtils).deleteWorkflowIfItExist(workflowClient, CONNECTION_ID);
Expand Down
Loading

0 comments on commit d22dfe8

Please sign in to comment.