Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the block logic and block after the job creation #10597

Merged
merged 2 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,6 @@ public void testManualSync() throws Exception {
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
assertSourceAndDestinationDbInSync(false);
Expand All @@ -512,10 +508,6 @@ public void testCancelSync() throws Exception {
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForJob(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING));

Expand Down Expand Up @@ -548,10 +540,6 @@ public void testIncrementalSync() throws Exception {
.destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
LOGGER.info("Beginning testIncrementalSync() sync 1");
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -655,10 +643,6 @@ public void testMultipleSchemasAndTablesSync() throws Exception {
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
assertSourceAndDestinationDbInSync(false);
Expand Down Expand Up @@ -786,10 +770,6 @@ public void testCheckpointing() throws Exception {
.destinationSyncMode(destinationSyncMode));
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null).getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

Expand Down Expand Up @@ -878,10 +858,6 @@ public void testBackpressure() throws Exception {
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null)
.getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

Expand Down Expand Up @@ -959,10 +935,6 @@ public void testFailureTimeout() throws Exception {
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, Collections.emptyList(), catalog, null)
.getConnectionId();
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}

final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -1001,11 +973,6 @@ public void testDowntimeDuringSync() throws Exception {
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}

final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

Thread.sleep(5000);
Expand Down Expand Up @@ -1045,10 +1012,6 @@ public void testCancelSyncWithInterruption() throws Exception {
final UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForJob(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING));

Expand Down Expand Up @@ -1083,10 +1046,6 @@ public void testCuttingOffPodBeforeFilesTransfer() throws Exception {
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

LOGGER.info("Waiting for connection to be available in Temporal...");
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}

LOGGER.info("Run manual sync...");
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -1131,10 +1090,6 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

LOGGER.info("Waiting for connection to be available in Temporal...");
// Avoid Race condition with the new scheduler
if (featureFlags.usesNewScheduler()) {
waitForTemporalWorkflow(connectionId);
}

LOGGER.info("Run manual sync...");
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
Expand Down Expand Up @@ -1556,17 +1511,4 @@ public enum Type {
DESTINATION
}

private static void waitForTemporalWorkflow(final UUID connectionId) {
/*
* do { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e);
* } } while
* (temporalClient.isWorkflowRunning(temporalClient.getConnectionManagerName(connectionId)));
*/
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Value;
Expand Down Expand Up @@ -220,6 +224,22 @@ public void submitConnectionUpdaterAsync(final UUID connectionId) {
signalRequest.add(connectionManagerWorkflow::run, input);

WorkflowClient.start(connectionManagerWorkflow::run, input);

try {
CompletableFuture.supplyAsync(() -> {
try {
do {
Thread.sleep(DELAY_BETWEEN_QUERY_MS);
} while (!isWorkflowRunning(getConnectionManagerName(connectionId)));
} catch (InterruptedException e) {}

return null;
}).get(60, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to create a new connection manager workflow", e);
} catch (TimeoutException e) {
log.error("Can't create a new connection manager workflow due to timeout", e);
}
}

public void deleteConnection(final UUID connectionId) {
Expand Down Expand Up @@ -446,37 +466,18 @@ <T> TemporalResponse<T> execute(final JobRunConfig jobRunConfig, final Supplier<
}

/**
* Check if a workflow is currently running. It is using the temporal pagination (see:
* https://temporalio.slack.com/archives/CTRCR8RBP/p1638926310308200)
* Check if a workflow is currently running. Running means that it is query-able, thus we check that
* we can properly launch a query
*/
public boolean isWorkflowRunning(final String workflowName) {
ByteString token;
ListOpenWorkflowExecutionsRequest openWorkflowExecutionsRequest =
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setMaximumPageSize(MAXIMUM_SEARCH_PAGE_SIZE)
.build();
do {
final ListOpenWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest =
service.blockingStub().listOpenWorkflowExecutions(openWorkflowExecutionsRequest);
final long matchingWorkflowCount = listOpenWorkflowExecutionsRequest.getExecutionsList().stream()
.filter((workflowExecutionInfo -> workflowExecutionInfo.getExecution().getWorkflowId().equals(workflowName)))
.count();
if (matchingWorkflowCount != 0) {
return true;
}
token = listOpenWorkflowExecutionsRequest.getNextPageToken();

openWorkflowExecutionsRequest =
ListOpenWorkflowExecutionsRequest.newBuilder()
.setNamespace(client.getOptions().getNamespace())
.setNextPageToken(token)
.setMaximumPageSize(MAXIMUM_SEARCH_PAGE_SIZE)
.build();

} while (token != null && token.size() > 0);
try {
ConnectionManagerWorkflow connectionManagerWorkflow = getExistingWorkflow(ConnectionManagerWorkflow.class, workflowName);
connectionManagerWorkflow.getState();

return false;
return true;
} catch (Exception e) {
return false;
}
}

@VisibleForTesting
Expand Down