Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "fix: refresh actor configuration and state between sync attempts"" #22281

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2214,6 +2214,26 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/InternalOperationResult"
/v1/attempt/save_sync_config:
post:
tags:
- attempt
- internal
summary: For worker to save the AttemptSyncConfig for an attempt.
operationId: saveSyncConfig
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SaveAttemptSyncConfigRequestBody"
required: true
responses:
"200":
description: Successful Operation
content:
application/json:
schema:
$ref: "#/components/schemas/InternalOperationResult"

components:
securitySchemes:
Expand Down Expand Up @@ -5025,6 +5045,31 @@ components:
type: array
items:
$ref: "#/components/schemas/AttemptStreamStats"
AttemptSyncConfig:
type: object
required:
- sourceConfiguration
- destinationConfiguration
properties:
sourceConfiguration:
$ref: "#/components/schemas/SourceConfiguration"
destinationConfiguration:
$ref: "#/components/schemas/DestinationConfiguration"
state:
$ref: "#/components/schemas/ConnectionState"
SaveAttemptSyncConfigRequestBody:
type: object
required:
- jobId
- attemptNumber
- syncConfig
properties:
jobId:
$ref: "#/components/schemas/JobId"
attemptNumber:
$ref: "#/components/schemas/AttemptNumber"
syncConfig:
$ref: "#/components/schemas/AttemptSyncConfig"
InternalOperationResult:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class BootloaderTest {

// ⚠️ This line should change with every new migration to show that you meant to make a new
// migration to the prod database
private static final String CURRENT_MIGRATION_VERSION = "0.40.28.001";
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.40.28.001";
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.40.28.001";

@BeforeEach
void setup() {
Expand Down Expand Up @@ -147,10 +148,10 @@ void testBootloaderAppBlankDb() throws Exception {
bootloader.load();

val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
assertEquals("0.40.26.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals(CURRENT_JOBS_MIGRATION_VERSION, jobsMigrator.getLatestMigration().getVersion().getVersion());

val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
assertEquals(CURRENT_MIGRATION_VERSION, configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals(CURRENT_CONFIGS_MIGRATION_VERSION, configsMigrator.getLatestMigration().getVersion().getVersion());

assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get());
assertEquals(new Version(PROTOCOL_VERSION_123), jobsPersistence.getAirbyteProtocolVersionMin().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
package io.airbyte.commons.server.converters;

import io.airbyte.api.model.generated.ActorDefinitionResourceRequirements;
import io.airbyte.api.model.generated.AttemptSyncConfig;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.ConnectionSchedule;
import io.airbyte.api.model.generated.ConnectionScheduleData;
import io.airbyte.api.model.generated.ConnectionScheduleDataBasicSchedule;
import io.airbyte.api.model.generated.ConnectionScheduleDataCron;
import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateType;
import io.airbyte.api.model.generated.ConnectionStatus;
import io.airbyte.api.model.generated.Geography;
import io.airbyte.api.model.generated.JobType;
Expand All @@ -22,6 +25,12 @@
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.Schedule;
import io.airbyte.config.StandardSync;
import io.airbyte.config.State;
import io.airbyte.config.StateWrapper;
import io.airbyte.config.helpers.StateMessageHelper;
import io.airbyte.workers.helper.StateConverter;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

public class ApiPojoConverters {
Expand All @@ -42,6 +51,42 @@ public static io.airbyte.config.ActorDefinitionResourceRequirements actorDefReso
.collect(Collectors.toList()));
}

public static io.airbyte.config.AttemptSyncConfig attemptSyncConfigToInternal(final AttemptSyncConfig attemptSyncConfig) {
if (attemptSyncConfig == null) {
return null;
}

final io.airbyte.config.AttemptSyncConfig internalAttemptSyncConfig = new io.airbyte.config.AttemptSyncConfig()
.withSourceConfiguration(attemptSyncConfig.getSourceConfiguration())
.withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration());

final ConnectionState connectionState = attemptSyncConfig.getState();
if (connectionState != null && connectionState.getStateType() != ConnectionStateType.NOT_SET) {
final StateWrapper stateWrapper = StateConverter.toInternal(attemptSyncConfig.getState());
final io.airbyte.config.State state = StateMessageHelper.getState(stateWrapper);
internalAttemptSyncConfig.setState(state);
}

return internalAttemptSyncConfig;
}

public static io.airbyte.api.client.model.generated.AttemptSyncConfig attemptSyncConfigToClient(final io.airbyte.config.AttemptSyncConfig attemptSyncConfig,
final UUID connectionId,
final boolean useStreamCapableState) {
if (attemptSyncConfig == null) {
return null;
}

final State state = attemptSyncConfig.getState();
final Optional<StateWrapper> optStateWrapper = state != null ? StateMessageHelper.getTypedState(
state.getState(), useStreamCapableState) : Optional.empty();

return new io.airbyte.api.client.model.generated.AttemptSyncConfig()
.sourceConfiguration(attemptSyncConfig.getSourceConfiguration())
.destinationConfiguration(attemptSyncConfig.getDestinationConfiguration())
.state(StateConverter.toClient(connectionId, optStateWrapper.orElse(null)));
}

public static ActorDefinitionResourceRequirements actorDefResourceReqsToApi(final io.airbyte.config.ActorDefinitionResourceRequirements actorDefResourceReqs) {
if (actorDefResourceReqs == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package io.airbyte.commons.server.handlers;

import io.airbyte.api.model.generated.InternalOperationResult;
import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody;
import io.airbyte.api.model.generated.SaveStatsRequestBody;
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
import io.airbyte.commons.server.converters.ApiPojoConverters;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.persistence.job.JobPersistence;
Expand Down Expand Up @@ -63,4 +65,17 @@ public InternalOperationResult saveStats(final SaveStatsRequestBody requestBody)
return new InternalOperationResult().succeeded(true);
}

public InternalOperationResult saveSyncConfig(final SaveAttemptSyncConfigRequestBody requestBody) {
try {
jobPersistence.writeAttemptSyncConfig(
requestBody.getJobId(),
requestBody.getAttemptNumber(),
ApiPojoConverters.attemptSyncConfigToInternal(requestBody.getSyncConfig()));
} catch (final IOException ioe) {
LOGGER.error("IOException when saving AttemptSyncConfig for attempt;", ioe);
return new InternalOperationResult().succeeded(false);
}
return new InternalOperationResult().succeeded(true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doThrow;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.api.model.generated.AttemptSyncConfig;
import io.airbyte.api.model.generated.ConnectionState;
import io.airbyte.api.model.generated.ConnectionStateType;
import io.airbyte.api.model.generated.GlobalState;
import io.airbyte.api.model.generated.SaveAttemptSyncConfigRequestBody;
import io.airbyte.api.model.generated.SetWorkflowInAttemptRequestBody;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.server.converters.ApiPojoConverters;
import io.airbyte.persistence.job.JobPersistence;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -26,6 +35,7 @@ class AttemptHandlerTest {
JobPersistence jobPersistence;
AttemptHandler handler;

private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final long JOB_ID = 10002L;
private static final int ATTEMPT_NUMBER = 1;

Expand All @@ -39,14 +49,14 @@ public void init() {

@Test
void testInternalWorkerHandlerSetsTemporalWorkflowId() throws Exception {
String workflowId = UUID.randomUUID().toString();
final String workflowId = UUID.randomUUID().toString();

final ArgumentCaptor<Integer> attemptNumberCapture = ArgumentCaptor.forClass(Integer.class);
final ArgumentCaptor<Long> jobIdCapture = ArgumentCaptor.forClass(Long.class);
final ArgumentCaptor<String> workflowIdCapture = ArgumentCaptor.forClass(String.class);
final ArgumentCaptor<String> queueCapture = ArgumentCaptor.forClass(String.class);

SetWorkflowInAttemptRequestBody requestBody =
final SetWorkflowInAttemptRequestBody requestBody =
new SetWorkflowInAttemptRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).workflowId(workflowId)
.processingTaskQueue(PROCESSING_TASK_QUEUE);

Expand All @@ -63,7 +73,7 @@ void testInternalWorkerHandlerSetsTemporalWorkflowId() throws Exception {

@Test
void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception {
String workflowId = UUID.randomUUID().toString();
final String workflowId = UUID.randomUUID().toString();

doThrow(IOException.class).when(jobPersistence).setAttemptTemporalWorkflowInfo(anyLong(), anyInt(),
any(), any());
Expand All @@ -73,7 +83,7 @@ void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception {
final ArgumentCaptor<String> workflowIdCapture = ArgumentCaptor.forClass(String.class);
final ArgumentCaptor<String> queueCapture = ArgumentCaptor.forClass(String.class);

SetWorkflowInAttemptRequestBody requestBody =
final SetWorkflowInAttemptRequestBody requestBody =
new SetWorkflowInAttemptRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).workflowId(workflowId)
.processingTaskQueue(PROCESSING_TASK_QUEUE);

Expand All @@ -88,4 +98,38 @@ void testInternalWorkerHandlerSetsTemporalWorkflowIdThrows() throws Exception {
assertEquals(PROCESSING_TASK_QUEUE, queueCapture.getValue());
}

@Test
void testInternalHandlerSetsAttemptSyncConfig() throws Exception {
final ArgumentCaptor<Integer> attemptNumberCapture = ArgumentCaptor.forClass(Integer.class);
final ArgumentCaptor<Long> jobIdCapture = ArgumentCaptor.forClass(Long.class);
final ArgumentCaptor<io.airbyte.config.AttemptSyncConfig> attemptSyncConfigCapture =
ArgumentCaptor.forClass(io.airbyte.config.AttemptSyncConfig.class);

final JsonNode sourceConfig = Jsons.jsonNode(Map.of("source_key", "source_val"));
final JsonNode destinationConfig = Jsons.jsonNode(Map.of("destination_key", "destination_val"));
final ConnectionState state = new ConnectionState()
.connectionId(CONNECTION_ID)
.stateType(ConnectionStateType.GLOBAL)
.streamState(null)
.globalState(new GlobalState().sharedState(Jsons.jsonNode(Map.of("state_key", "state_val"))));

final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig()
.destinationConfiguration(destinationConfig)
.sourceConfiguration(sourceConfig)
.state(state);

final SaveAttemptSyncConfigRequestBody requestBody =
new SaveAttemptSyncConfigRequestBody().attemptNumber(ATTEMPT_NUMBER).jobId(JOB_ID).syncConfig(attemptSyncConfig);

assertTrue(handler.saveSyncConfig(requestBody).getSucceeded());

Mockito.verify(jobPersistence).writeAttemptSyncConfig(jobIdCapture.capture(), attemptNumberCapture.capture(), attemptSyncConfigCapture.capture());

final io.airbyte.config.AttemptSyncConfig expectedAttemptSyncConfig = ApiPojoConverters.attemptSyncConfigToInternal(attemptSyncConfig);

assertEquals(ATTEMPT_NUMBER, attemptNumberCapture.getValue());
assertEquals(JOB_ID, jobIdCapture.getValue());
assertEquals(expectedAttemptSyncConfig, attemptSyncConfigCapture.getValue());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private static AttemptRead toAttemptRead(final Attempt a) {
}

private static Attempt createAttempt(final long jobId, final long timestamps, final AttemptStatus status) {
return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, status, null, null, timestamps, timestamps, timestamps);
return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, null, status, null, null, timestamps, timestamps, timestamps);
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.commons.temporal.scheduling.SpecWorkflow;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.airbyte.config.AttemptSyncConfig;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.JobCheckConnectionConfig;
import io.airbyte.config.JobDiscoverCatalogConfig;
Expand Down Expand Up @@ -374,7 +375,11 @@ public TemporalResponse<ConnectorJobOutput> submitDiscoverSchema(final UUID jobI
() -> getWorkflowStubWithTaskQueue(DiscoverCatalogWorkflow.class, taskQueue).run(jobRunConfig, launcherConfig, input));
}

public TemporalResponse<StandardSyncOutput> submitSync(final long jobId, final int attempt, final JobSyncConfig config, final UUID connectionId) {
public TemporalResponse<StandardSyncOutput> submitSync(final long jobId,
final int attempt,
final JobSyncConfig config,
final AttemptSyncConfig attemptConfig,
final UUID connectionId) {
final JobRunConfig jobRunConfig = TemporalWorkflowUtils.createJobRunConfig(jobId, attempt);

final IntegrationLauncherConfig sourceLauncherConfig = new IntegrationLauncherConfig()
Expand All @@ -395,11 +400,11 @@ public TemporalResponse<StandardSyncOutput> submitSync(final long jobId, final i
.withNamespaceDefinition(config.getNamespaceDefinition())
.withNamespaceFormat(config.getNamespaceFormat())
.withPrefix(config.getPrefix())
.withSourceConfiguration(config.getSourceConfiguration())
.withDestinationConfiguration(config.getDestinationConfiguration())
.withSourceConfiguration(attemptConfig.getSourceConfiguration())
.withDestinationConfiguration(attemptConfig.getDestinationConfiguration())
.withOperationSequence(config.getOperationSequence())
.withCatalog(config.getConfiguredAirbyteCatalog())
.withState(config.getState())
.withState(attemptConfig.getState())
.withResourceRequirements(config.getResourceRequirements())
.withSourceResourceRequirements(config.getSourceResourceRequirements())
.withDestinationResourceRequirements(config.getDestinationResourceRequirements())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.airbyte.commons.temporal.scheduling.SpecWorkflow;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.commons.temporal.scheduling.state.WorkflowState;
import io.airbyte.config.AttemptSyncConfig;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobCheckConnectionConfig;
Expand Down Expand Up @@ -274,26 +275,27 @@ void testSubmitSync() {
final JobSyncConfig syncConfig = new JobSyncConfig()
.withSourceDockerImage(IMAGE_NAME1)
.withDestinationDockerImage(IMAGE_NAME2)
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(Jsons.emptyObject())
.withOperationSequence(List.of())
.withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog());
final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig()
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(Jsons.emptyObject());
final StandardSyncInput input = new StandardSyncInput()
.withNamespaceDefinition(syncConfig.getNamespaceDefinition())
.withNamespaceFormat(syncConfig.getNamespaceFormat())
.withPrefix(syncConfig.getPrefix())
.withSourceConfiguration(syncConfig.getSourceConfiguration())
.withDestinationConfiguration(syncConfig.getDestinationConfiguration())
.withSourceConfiguration(attemptSyncConfig.getSourceConfiguration())
.withDestinationConfiguration(attemptSyncConfig.getDestinationConfiguration())
.withOperationSequence(syncConfig.getOperationSequence())
.withCatalog(syncConfig.getConfiguredAirbyteCatalog())
.withState(syncConfig.getState());
.withState(attemptSyncConfig.getState());

final IntegrationLauncherConfig destinationLauncherConfig = new IntegrationLauncherConfig()
.withJobId(String.valueOf(JOB_ID))
.withAttemptId((long) ATTEMPT_ID)
.withDockerImage(IMAGE_NAME2);

temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID);
temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID);
discoverCatalogWorkflow.run(JOB_RUN_CONFIG, LAUNCHER_CONFIG, destinationLauncherConfig, input, CONNECTION_ID);
verify(workflowClient).newWorkflowStub(SyncWorkflow.class, TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.SYNC));
}
Expand Down Expand Up @@ -343,15 +345,17 @@ void testforceCancelConnection() {
doReturn(true).when(temporalClient).isWorkflowReachable(any(UUID.class));
when(workflowClient.newWorkflowStub(any(Class.class), anyString())).thenReturn(mConnectionManagerWorkflow);

final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig()
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(Jsons.emptyObject());

final JobSyncConfig syncConfig = new JobSyncConfig()
.withSourceDockerImage(IMAGE_NAME1)
.withDestinationDockerImage(IMAGE_NAME2)
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(Jsons.emptyObject())
.withOperationSequence(List.of())
.withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog());

temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID);
temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, attemptSyncConfig, CONNECTION_ID);
temporalClient.forceDeleteWorkflow(CONNECTION_ID);

verify(connectionManagerUtils).deleteWorkflowIfItExist(workflowClient, CONNECTION_ID);
Expand Down
Loading