Skip to content

Commit

Permalink
Do not wait the end of a reset to return an update (#17591)
Browse files Browse the repository at this point in the history
* Directly save after update

* Fix existing tests

* Rm unused

* Format

* Add test t

* Format
  • Loading branch information
benmoriceau authored Oct 5, 2022
1 parent 49cb336 commit b4fd1ea
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public interface ConnectionManagerWorkflow {
@SignalMethod
void resetConnection();

@SignalMethod
void resetConnectionAndSkipNextScheduling();

/**
* If an activity fails the workflow will be stuck. This signal activity can be used to retry the
* activity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan
@Deprecated
private final boolean resetWithScheduling = false;
private boolean doneWaiting = false;
private boolean skipSchedulingNextWorkflow = false;

public void setRunning(final boolean running) {
final ChangedStateEvent event = new ChangedStateEvent(
Expand Down Expand Up @@ -128,6 +129,14 @@ public void setDoneWaiting(final boolean doneWaiting) {
this.doneWaiting = doneWaiting;
}

public void setSkipSchedulingNextWorkflow(final boolean skipSchedulingNextWorkflow) {
final ChangedStateEvent event = new ChangedStateEvent(
StateField.SKIP_SCHEDULING_NEXT_WORKFLOW,
skipSchedulingNextWorkflow);
stateChangedListener.addEvent(id, event);
this.skipSchedulingNextWorkflow = skipSchedulingNextWorkflow;
}

// TODO: bmoric -> This is noisy when inpecting the list of event, it should be just a single reset
// event.
public void reset() {
Expand All @@ -141,6 +150,7 @@ public void reset() {
this.setSuccess(false);
this.setQuarantined(false);
this.setDoneWaiting(false);
this.setSkipSchedulingNextWorkflow(false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ enum StateField {
CANCELLED_FOR_RESET,
RESET_WITH_SCHEDULING,
DONE_WAITING,
SKIP_SCHEDULING_NEXT_WORKFLOW,
}

@Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ private JobInfoRead submitManualSyncToWorker(final UUID connectionId) throws IOE
private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throws IOException, JsonValidationException, ConfigNotFoundException {
final ManualOperationResult resetConnectionResult = eventRunner.resetConnection(
connectionId,
configRepository.getAllStreamsForConnection(connectionId));
configRepository.getAllStreamsForConnection(connectionId),
false);

return readJobFromResult(resetConnectionResult);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import io.airbyte.server.scheduler.EventRunner;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.helper.ProtocolConverters;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -422,12 +421,9 @@ private ConnectionRead resetStreamsIfNeeded(final WebBackendConnectionUpdate web
if (stateType == ConnectionStateType.LEGACY || stateType == ConnectionStateType.NOT_SET) {
streamsToReset = configRepository.getAllStreamsForConnection(connectionId);
}
ManualOperationResult manualOperationResult = eventRunner.synchronousResetConnection(
eventRunner.resetConnection(
connectionId,
streamsToReset);
verifyManualOperationResult(manualOperationResult);
manualOperationResult = eventRunner.startNewManualSync(connectionId);
verifyManualOperationResult(manualOperationResult);
streamsToReset, true);

// return updated connectionRead after reset
return connectionsHandler.getConnection(connectionId);
Expand All @@ -437,12 +433,6 @@ private ConnectionRead resetStreamsIfNeeded(final WebBackendConnectionUpdate web
return updatedConnectionRead;
}

private void verifyManualOperationResult(final ManualOperationResult manualOperationResult) throws IllegalStateException {
if (manualOperationResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualOperationResult.getFailingReason().get());
}
}

private List<UUID> createOperations(final WebBackendConnectionCreate webBackendConnectionCreate)
throws JsonValidationException, ConfigNotFoundException, IOException {
if (webBackendConnectionCreate.getOperations() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ public interface EventRunner {

ManualOperationResult startNewCancellation(final UUID connectionId);

ManualOperationResult resetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset);

ManualOperationResult synchronousResetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset);
ManualOperationResult resetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset, final boolean runSyncImmediately);

void deleteConnection(final UUID connectionId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) {
}

@Override
public ManualOperationResult resetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset) {
return temporalClient.resetConnection(connectionId, streamsToReset);
}

@Override
public ManualOperationResult synchronousResetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset) {
return temporalClient.synchronousResetConnection(connectionId, streamsToReset);
public ManualOperationResult resetConnection(final UUID connectionId,
final List<StreamDescriptor> streamsToReset,
final boolean runSyncImmediately) {
return temporalClient.resetConnection(connectionId, streamsToReset, runSyncImmediately);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,15 +623,15 @@ void testResetConnection() throws IOException, JsonValidationException, ConfigNo
when(configRepository.getAllStreamsForConnection(connectionId))
.thenReturn(streamDescriptors);

when(eventRunner.resetConnection(connectionId, streamDescriptors))
when(eventRunner.resetConnection(connectionId, streamDescriptors, false))
.thenReturn(manualOperationResult);

doReturn(new JobInfoRead())
.when(jobConverter).getJobInfoRead(any());

schedulerHandler.resetConnection(new ConnectionIdRequestBody().connectionId(connectionId));

verify(eventRunner).resetConnection(connectionId, streamDescriptors);
verify(eventRunner).resetConnection(connectionId, streamDescriptors, false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -642,7 +643,7 @@ void testUpdateConnectionWithUpdatedSchemaLegacy() throws JsonValidationExceptio
when(configRepository.getAllStreamsForConnection(expected.getConnectionId())).thenReturn(connectionStreams);

final ManualOperationResult successfulResult = ManualOperationResult.builder().jobId(Optional.empty()).failingReason(Optional.empty()).build();
when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult);
when(eventRunner.resetConnection(any(), any(), anyBoolean())).thenReturn(successfulResult);
when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);
Expand All @@ -654,8 +655,7 @@ void testUpdateConnectionWithUpdatedSchemaLegacy() throws JsonValidationExceptio
verify(schedulerHandler, times(0)).syncConnection(connectionId);
verify(connectionsHandler, times(1)).updateConnection(any());
final InOrder orderVerifier = inOrder(eventRunner);
orderVerifier.verify(eventRunner, times(1)).synchronousResetConnection(connectionId.getConnectionId(), connectionStreams);
orderVerifier.verify(eventRunner, times(1)).startNewManualSync(connectionId.getConnectionId());
orderVerifier.verify(eventRunner, times(1)).resetConnection(connectionId.getConnectionId(), connectionStreams, true);
}

@Test
Expand Down Expand Up @@ -708,7 +708,7 @@ void testUpdateConnectionWithUpdatedSchemaPerStream() throws JsonValidationExcep
when(connectionsHandler.getConnection(expected.getConnectionId())).thenReturn(connectionRead);

final ManualOperationResult successfulResult = ManualOperationResult.builder().jobId(Optional.empty()).failingReason(Optional.empty()).build();
when(eventRunner.synchronousResetConnection(any(), any())).thenReturn(successfulResult);
when(eventRunner.resetConnection(any(), any(), anyBoolean())).thenReturn(successfulResult);
when(eventRunner.startNewManualSync(any())).thenReturn(successfulResult);

final WebBackendConnectionRead result = wbHandler.webBackendUpdateConnection(updateBody);
Expand All @@ -720,12 +720,12 @@ void testUpdateConnectionWithUpdatedSchemaPerStream() throws JsonValidationExcep
verify(schedulerHandler, times(0)).syncConnection(connectionId);
verify(connectionsHandler, times(1)).updateConnection(any());
final InOrder orderVerifier = inOrder(eventRunner);
orderVerifier.verify(eventRunner, times(1)).synchronousResetConnection(connectionId.getConnectionId(),
orderVerifier.verify(eventRunner, times(1)).resetConnection(connectionId.getConnectionId(),
List.of(new io.airbyte.protocol.models.StreamDescriptor().withName("addStream"),
new io.airbyte.protocol.models.StreamDescriptor().withName("updateStream"),
new io.airbyte.protocol.models.StreamDescriptor().withName("configUpdateStream"),
new io.airbyte.protocol.models.StreamDescriptor().withName("removeStream")));
orderVerifier.verify(eventRunner, times(1)).startNewManualSync(connectionId.getConnectionId());
new io.airbyte.protocol.models.StreamDescriptor().withName("removeStream")),
true);
}

@Test
Expand Down Expand Up @@ -776,7 +776,7 @@ void testUpdateConnectionNoStreamsToReset() throws JsonValidationException, Conf
verify(schedulerHandler, times(0)).syncConnection(connectionId);
verify(connectionsHandler, times(1)).updateConnection(any());
final InOrder orderVerifier = inOrder(eventRunner);
orderVerifier.verify(eventRunner, times(0)).synchronousResetConnection(eq(connectionId.getConnectionId()), any());
orderVerifier.verify(eventRunner, times(0)).resetConnection(eq(connectionId.getConnectionId()), any(), anyBoolean());
orderVerifier.verify(eventRunner, times(0)).startNewManualSync(connectionId.getConnectionId());
}

Expand Down Expand Up @@ -819,8 +819,7 @@ void testUpdateConnectionWithSkipReset() throws JsonValidationException, ConfigN
verify(schedulerHandler, times(0)).syncConnection(connectionId);
verify(connectionsHandler, times(0)).getDiff(any(), any());
verify(connectionsHandler, times(1)).updateConnection(any());
verify(eventRunner, times(0)).synchronousResetConnection(any(), any());
verify(eventRunner, times(0)).startNewManualSync(any());
verify(eventRunner, times(0)).resetConnection(any(), any(), eq(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,9 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) {
Optional.of(jobId), Optional.empty());
}

public ManualOperationResult resetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset) {
public ManualOperationResult resetConnection(final UUID connectionId,
final List<StreamDescriptor> streamsToReset,
final boolean syncImmediatelyAfter) {
log.info("reset sync request");

try {
Expand All @@ -402,7 +404,11 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List
final long oldJobId = connectionManagerUtils.getCurrentJobId(client, connectionId);

try {
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::resetConnection);
if (syncImmediatelyAfter) {
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::resetConnectionAndSkipNextScheduling);
} else {
connectionManagerUtils.signalWorkflowAndRepairIfNecessary(client, connectionId, workflow -> workflow::resetConnection);
}
} catch (final DeletedWorkflowException e) {
log.error("Can't reset a deleted workflow", e);
return new ManualOperationResult(
Expand Down Expand Up @@ -439,36 +445,6 @@ private Optional<Long> getNewJobId(final UUID connectionId, final long oldJobId)
}
}

/**
* This is launching a reset and wait for the reset to be performed.
*
* The way to do so is to wait for the jobId to change, either to a new job id or the default id
* that signal that a workflow is waiting to be submitted
*/
public ManualOperationResult synchronousResetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset) {
final ManualOperationResult resetResult = resetConnection(connectionId, streamsToReset);
if (resetResult.getFailingReason().isPresent()) {
return resetResult;
}

final long resetJobId = resetResult.getJobId().get();
do {
try {
Thread.sleep(DELAY_BETWEEN_QUERY_MS);
} catch (final InterruptedException e) {
return new ManualOperationResult(
Optional.of("Didn't manage to reset a sync for: " + connectionId),
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}
} while (connectionManagerUtils.getCurrentJobId(client, connectionId) == resetJobId);

log.info("End of reset");

return new ManualOperationResult(
Optional.empty(),
Optional.of(resetJobId), Optional.empty());
}

/**
* This should be in the class {@li}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,18 @@ public void resetConnection() {
}
}

@Override
public void resetConnectionAndSkipNextScheduling() {
if (workflowState.isDoneWaiting()) {
workflowState.setCancelledForReset(true);
workflowState.setSkipSchedulingNextWorkflow(true);
cancellableSyncWorkflow.cancel();
} else {
workflowState.setSkipScheduling(true);
workflowState.setSkipSchedulingNextWorkflow(true);
}
}

@Override
public void retryFailedActivity() {
workflowState.setRetryFailedActivity(true);
Expand Down Expand Up @@ -537,6 +549,9 @@ private void prepareForNextRunAndContinueAsNew(final ConnectionUpdaterInput conn
workflowInternalState.getFailures().clear();
workflowInternalState.setPartialSuccess(null);
final boolean isDeleted = workflowState.isDeleted();
if (workflowState.isSkipSchedulingNextWorkflow()) {
connectionUpdaterInput.setSkipScheduling(true);
}
workflowState.reset();
if (!isDeleted) {
Workflow.continueAsNew(connectionUpdaterInput);
Expand Down
Loading

0 comments on commit b4fd1ea

Please sign in to comment.