Skip to content

Commit

Permalink
Revert "Disable broken connections when feature flag is on (#19730)"
Browse files Browse the repository at this point in the history
This reverts commit f07cef8.
  • Loading branch information
terencecho committed Dec 2, 2022
1 parent 054933b commit e17d1c2
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 286 deletions.
2 changes: 0 additions & 2 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2935,8 +2935,6 @@ components:
$ref: "#/components/schemas/CatalogDiff"
breakingChange:
type: boolean
connectionStatus:
$ref: "#/components/schemas/ConnectionStatus"
SourceSearch:
type: object
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.airbyte.analytics.Deployment;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.lang.CloseableShutdownHook;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.temporal.ConnectionManagerUtils;
Expand Down Expand Up @@ -214,8 +213,6 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
final TrackingClient trackingClient = TrackingClientSingleton.get();
final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient);

final EnvVariableFeatureFlags envVariableFeatureFlags = new EnvVariableFeatureFlags();

final WebUrlHelper webUrlHelper = new WebUrlHelper(configs.getWebappUrl());
final JobErrorReportingClient jobErrorReportingClient = JobErrorReportingClientFactory.getClient(configs.getJobErrorReportingStrategy(), configs);
final JobErrorReporter jobErrorReporter =
Expand Down Expand Up @@ -289,8 +286,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
configs.getWorkerEnvironment(),
configs.getLogConfigs(),
eventRunner,
connectionsHandler,
envVariableFeatureFlags);
connectionsHandler);

final DbMigrationHandler dbMigrationHandler = new DbMigrationHandler(configsDatabase, configsFlyway, jobsDatabase, jobsFlyway);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.ConnectionStatus;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationCoreConfig;
import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId;
Expand All @@ -29,7 +27,6 @@
import io.airbyte.api.model.generated.JobIdRequestBody;
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.LogRead;
import io.airbyte.api.model.generated.NonBreakingChangesPreference;
import io.airbyte.api.model.generated.SourceCoreConfig;
import io.airbyte.api.model.generated.SourceDefinitionIdWithWorkspaceId;
import io.airbyte.api.model.generated.SourceDefinitionSpecificationRead;
Expand All @@ -42,7 +39,6 @@
import io.airbyte.api.model.generated.SynchronousJobRead;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.ErrorCode;
import io.airbyte.commons.temporal.TemporalClient.ManualOperationResult;
Expand Down Expand Up @@ -99,7 +95,6 @@ public class SchedulerHandler {
private final JobPersistence jobPersistence;
private final JobConverter jobConverter;
private final EventRunner eventRunner;
private final EnvVariableFeatureFlags envVariableFeatureFlags;

public SchedulerHandler(final ConfigRepository configRepository,
final SecretsRepositoryReader secretsRepositoryReader,
Expand All @@ -109,8 +104,7 @@ public SchedulerHandler(final ConfigRepository configRepository,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final EventRunner eventRunner,
final ConnectionsHandler connectionsHandler,
final EnvVariableFeatureFlags envVariableFeatureFlags) {
final ConnectionsHandler connectionsHandler) {
this(
configRepository,
secretsRepositoryWriter,
Expand All @@ -120,8 +114,7 @@ public SchedulerHandler(final ConfigRepository configRepository,
jobPersistence,
eventRunner,
new JobConverter(workerEnvironment, logConfigs),
connectionsHandler,
envVariableFeatureFlags);
connectionsHandler);
}

@VisibleForTesting
Expand All @@ -133,8 +126,7 @@ public SchedulerHandler(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final EventRunner eventRunner,
final JobConverter jobConverter,
final ConnectionsHandler connectionsHandler,
final EnvVariableFeatureFlags envVariableFeatureFlags) {
final ConnectionsHandler connectionsHandler) {
this.configRepository = configRepository;
this.secretsRepositoryWriter = secretsRepositoryWriter;
this.synchronousSchedulerClient = synchronousSchedulerClient;
Expand All @@ -144,7 +136,6 @@ public SchedulerHandler(final ConfigRepository configRepository,
this.eventRunner = eventRunner;
this.jobConverter = jobConverter;
this.connectionsHandler = connectionsHandler;
this.envVariableFeatureFlags = envVariableFeatureFlags;
}

public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody)
Expand Down Expand Up @@ -369,32 +360,16 @@ private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discovered
throws JsonValidationException, ConfigNotFoundException, IOException {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId());
final ConnectionRead connectionRead = connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionRead.getSyncCatalog();
connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog();
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
boolean containsBreakingChange = containsBreakingChange(diff);
ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId());
ConnectionStatus connectionStatus;
if (shouldDisableConnection(containsBreakingChange, connectionRead.getNonBreakingChangesPreference(), diff)) {
connectionStatus = ConnectionStatus.INACTIVE;
} else {
connectionStatus = ConnectionStatus.ACTIVE;
}
updateObject.status(connectionStatus);
connectionsHandler.updateConnection(updateObject);
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange).connectionStatus(connectionStatus);

}

private boolean shouldDisableConnection(boolean containsBreakingChange, NonBreakingChangesPreference preference, CatalogDiff diff) {
if (!envVariableFeatureFlags.autoDetectSchema()) {
return false;
}
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange);

return containsBreakingChange || (preference == NonBreakingChangesPreference.DISABLE && !diff.getTransforms().isEmpty());
}

private CheckConnectionRead reportConnectionStatus(final SynchronousResponse<StandardCheckConnectionOutput> response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,6 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
*/
diff = refreshedCatalog.get().getCatalogDiff();
connection.setBreakingChange(refreshedCatalog.get().getBreakingChange());
connection.setStatus(refreshedCatalog.get().getConnectionStatus());
} else if (catalogUsedToMakeConfiguredCatalog.isPresent()) {
// reconstructs a full picture of the full schema at the time the catalog was configured.
syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get());
Expand Down
Loading

0 comments on commit e17d1c2

Please sign in to comment.