Skip to content

Commit

Permalink
Sweep old scheduler code (#13400)
Browse files Browse the repository at this point in the history
* sweep all scheduler application code and new-scheduler conditional logic

* remove airbyte-scheduler from deployments and docs

* format

* remove 'v2' from github actions

* add back scheduler in delete deployment command

* remove scheduler parameters from helm chart values

* add back job cleaner + test and add comment

* remove now-unused env vars from code and docs

* format

* remove feature flags from web backend connection handler as it is no longer needed

* remove feature flags from config api as it is now longer needed

* remove feature flags input from config api test

* format + shorter url

* remove scheduler parameters from helm chart readme
  • Loading branch information
lmossman authored Jun 6, 2022
1 parent 0342699 commit 73034c6
Show file tree
Hide file tree
Showing 81 changed files with 226 additions and 3,800 deletions.
2 changes: 0 additions & 2 deletions .bumpversion.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ serialize =

[bumpversion:file:airbyte-metrics/reporter/Dockerfile]

[bumpversion:file:airbyte-scheduler/app/Dockerfile]

[bumpversion:file:airbyte-server/Dockerfile]

[bumpversion:file:airbyte-webapp/package.json]
Expand Down
4 changes: 0 additions & 4 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ SENTRY_DSN="https://d4b03de0c4574c78999b8d58e55243dc@o1009025.ingest.sentry.io/6


### APPLICATIONS ###
# Scheduler #
# Relevant to scaling.
SUBMITTER_NUM_THREADS=10

# Worker #
# Relevant to scaling.
MAX_SYNC_WORKERS=5
Expand Down
30 changes: 15 additions & 15 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -521,11 +521,11 @@ jobs:
label: ${{ needs.start-platform-build-runner.outputs.label }}
ec2-instance-id: ${{ needs.start-platform-build-runner.outputs.ec2-instance-id }}

## Kube Acceptance Tests (with scheduler v2 - both temporal changes and container orchestrator)
## Kube Acceptance Tests
# Docker acceptance tests run as part of the build job.
# In case of self-hosted EC2 errors, remove this block.
start-kube-acceptance-test-runner-v2:
name: "Platform: Start Scheduler V2 Kube Acceptance Test Runner"
start-kube-acceptance-test-runner:
name: "Platform: Start Kube Acceptance Test Runner"
needs:
- changes
- find_valid_pat
Expand All @@ -548,11 +548,11 @@ jobs:
aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
github-token: ${{ needs.find_valid_pat.outputs.pat }}
kube-acceptance-test-v2:
name: "Platform: Acceptance Tests (Kube v2)"
kube-acceptance-test:
name: "Platform: Acceptance Tests (Kube)"
# In case of self-hosted EC2 errors, removed the `needs` line and switch back to running on ubuntu-latest.
needs: start-kube-acceptance-test-runner-v2 # required to start the main job when the runner is ready
runs-on: ${{ needs.start-kube-acceptance-test-runner-v2.outputs.label }} # run the job on the newly created runner
needs: start-kube-acceptance-test-runner # required to start the main job when the runner is ready
runs-on: ${{ needs.start-kube-acceptance-test-runner.outputs.label }} # run the job on the newly created runner
environment: more-secrets
timeout-minutes: 90
steps:
Expand Down Expand Up @@ -631,17 +631,17 @@ jobs:
name: Kubernetes Logs
path: /tmp/kubernetes_logs/*
# In case of self-hosted EC2 errors, remove this block.
stop-kube-acceptance-test-runner-v2:
stop-kube-acceptance-test-runner:
name: "Platform: Stop Kube Acceptance Test EC2 Runner"
timeout-minutes: 10
needs:
- start-kube-acceptance-test-runner-v2 # required to get output from the start-runner job
- kube-acceptance-test-v2 # required to wait when the main job is done
- start-kube-acceptance-test-runner # required to get output from the start-runner job
- kube-acceptance-test # required to wait when the main job is done
- find_valid_pat
runs-on: ubuntu-latest
# Always is required to stop the runner even if the previous job has errors. However always() runs even if the previous step is skipped.
# Thus, we check for skipped here.
if: ${{ always() && needs.start-kube-acceptance-test-runner-v2.result != 'skipped'}}
if: ${{ always() && needs.start-kube-acceptance-test-runner.result != 'skipped'}}
steps:
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
Expand All @@ -654,8 +654,8 @@ jobs:
with:
mode: stop
github-token: ${{ needs.find_valid_pat.outputs.pat }}
label: ${{ needs.start-kube-acceptance-test-runner-v2.outputs.label }}
ec2-instance-id: ${{ needs.start-kube-acceptance-test-runner-v2.outputs.ec2-instance-id }}
label: ${{ needs.start-kube-acceptance-test-runner.outputs.label }}
ec2-instance-id: ${{ needs.start-kube-acceptance-test-runner.outputs.ec2-instance-id }}

notify-failure-slack-channel:
name: "Notify Slack Channel on Build Failures"
Expand All @@ -665,7 +665,7 @@ jobs:
- frontend-build
- octavia-cli-build
- platform-build
- kube-acceptance-test-v2
- kube-acceptance-test
if: ${{ failure() && github.ref == 'refs/heads/master' }}
steps:
- name: Publish to OSS Build Failure Slack Channel
Expand All @@ -689,7 +689,7 @@ jobs:
- frontend-build
- octavia-cli-build
- platform-build
- kube-acceptance-test-v2
- kube-acceptance-test
if: success()
steps:
- name: Get Previous Workflow Status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ void testBootloaderAppBlankDb() throws Exception {
when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L);

val mockedFeatureFlags = mock(FeatureFlags.class);
when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false);

val mockedSecretMigrator = mock(SecretMigrator.class);

Expand Down Expand Up @@ -157,7 +156,6 @@ void testBootloaderAppRunSecretMigration() throws Exception {
when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L);

val mockedFeatureFlags = mock(FeatureFlags.class);
when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false);

final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder()
.copySecrets(true)
Expand Down Expand Up @@ -302,7 +300,6 @@ void testPostLoadExecutionExecutes() throws Exception {
when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L);

val mockedFeatureFlags = mock(FeatureFlags.class);
when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false);

val mockedSecretMigrator = mock(SecretMigrator.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,6 @@
@Slf4j
public class EnvVariableFeatureFlags implements FeatureFlags {

@Override
public boolean usesNewScheduler() {
// TODO: sweep this method along with the scheduler
log.info("New Scheduler: true (post-migration)");

// After migrating all OSS users onto the new temporal scheduler, this should always return true.
return true;
}

@Override
public boolean autoDisablesFailingConnections() {
log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
*/
public interface FeatureFlags {

boolean usesNewScheduler();

boolean autoDisablesFailingConnections();

boolean exposeSecretsInExport();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,18 +449,6 @@ public interface Configs {
*/
Set<Integer> getTemporalWorkerPorts();

// Scheduler
/**
* Define how and how often the Scheduler sweeps its local disk for old configs. Multiple variables
* are involved here. Please see {@link WorkspaceRetentionConfig} for more info.
*/
WorkspaceRetentionConfig getWorkspaceRetentionConfig();

/**
* Define the maximum number of concurrent jobs the Scheduler schedules. Defaults to 5.
*/
String getSubmitterNumThreads();

// Container Orchestrator
/**
* Define if Airbyte should use the container orchestrator. Internal-use only.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ public class EnvConfigs implements Configs {
public static final String SYNC_JOB_MAX_ATTEMPTS = "SYNC_JOB_MAX_ATTEMPTS";
public static final String SYNC_JOB_MAX_TIMEOUT_DAYS = "SYNC_JOB_MAX_TIMEOUT_DAYS";
private static final String CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED = "CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED";
private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB";
public static final String MAX_SPEC_WORKERS = "MAX_SPEC_WORKERS";
public static final String MAX_CHECK_WORKERS = "MAX_CHECK_WORKERS";
public static final String MAX_DISCOVER_WORKERS = "MAX_DISCOVER_WORKERS";
Expand All @@ -81,7 +78,6 @@ public class EnvConfigs implements Configs {
private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS";
private static final String TEMPORAL_HISTORY_RETENTION_IN_DAYS = "TEMPORAL_HISTORY_RETENTION_IN_DAYS";
public static final String JOB_KUBE_NAMESPACE = "JOB_KUBE_NAMESPACE";
private static final String SUBMITTER_NUM_THREADS = "SUBMITTER_NUM_THREADS";
public static final String JOB_MAIN_CONTAINER_CPU_REQUEST = "JOB_MAIN_CONTAINER_CPU_REQUEST";
public static final String JOB_MAIN_CONTAINER_CPU_LIMIT = "JOB_MAIN_CONTAINER_CPU_LIMIT";
public static final String JOB_MAIN_CONTAINER_MEMORY_REQUEST = "JOB_MAIN_CONTAINER_MEMORY_REQUEST";
Expand Down Expand Up @@ -156,9 +152,6 @@ public class EnvConfigs implements Configs {
private static final String DEFAULT_JOB_KUBE_SOCAT_IMAGE = "alpine/socat:1.7.4.3-r0";
private static final String DEFAULT_JOB_KUBE_BUSYBOX_IMAGE = "busybox:1.28";
private static final String DEFAULT_JOB_KUBE_CURL_IMAGE = "curlimages/curl:7.83.1";
private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1;
private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60;
private static final long DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB = 5000;
private static final int DEFAULT_DATABASE_INITIALIZATION_TIMEOUT_MS = 60 * 1000;

public static final long DEFAULT_MAX_SPEC_WORKERS = 5;
Expand Down Expand Up @@ -791,21 +784,6 @@ public Set<Integer> getTemporalWorkerPorts() {
return Arrays.stream(ports.split(",")).map(Integer::valueOf).collect(Collectors.toSet());
}

// Scheduler
@Override
public WorkspaceRetentionConfig getWorkspaceRetentionConfig() {
final long minDays = getEnvOrDefault(MINIMUM_WORKSPACE_RETENTION_DAYS, DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS);
final long maxDays = getEnvOrDefault(MAXIMUM_WORKSPACE_RETENTION_DAYS, DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS);
final long maxSizeMb = getEnvOrDefault(MAXIMUM_WORKSPACE_SIZE_MB, DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB);

return new WorkspaceRetentionConfig(minDays, maxDays, maxSizeMb);
}

@Override
public String getSubmitterNumThreads() {
return getEnvOrDefault(SUBMITTER_NUM_THREADS, "5");
}

@Override
public boolean getContainerOrchestratorEnabled() {
return getEnvOrDefault(CONTAINER_ORCHESTRATOR_ENABLED, false, Boolean::valueOf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
public enum MetricEmittingApps implements MetricEmittingApp {

METRICS_REPORTER("metrics-reporter"),
SCHEDULER("scheduler"),
WORKER("worker");

private String applicationName;
Expand Down
15 changes: 0 additions & 15 deletions airbyte-scheduler/app/Dockerfile

This file was deleted.

72 changes: 0 additions & 72 deletions airbyte-scheduler/app/build.gradle

This file was deleted.

7 changes: 0 additions & 7 deletions airbyte-scheduler/app/readme.md

This file was deleted.

This file was deleted.

Loading

0 comments on commit 73034c6

Please sign in to comment.