Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sweep old scheduler code #13400

Merged
merged 18 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .bumpversion.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ serialize =

[bumpversion:file:airbyte-metrics/reporter/Dockerfile]

[bumpversion:file:airbyte-scheduler/app/Dockerfile]

[bumpversion:file:airbyte-server/Dockerfile]

[bumpversion:file:airbyte-webapp/package.json]
Expand Down
4 changes: 0 additions & 4 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ SENTRY_DSN="https://d4b03de0c4574c78999b8d58e55243dc@o1009025.ingest.sentry.io/6


### APPLICATIONS ###
# Scheduler #
# Relevant to scaling.
SUBMITTER_NUM_THREADS=10

# Worker #
# Relevant to scaling.
MAX_SYNC_WORKERS=5
Expand Down
30 changes: 15 additions & 15 deletions .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -521,11 +521,11 @@ jobs:
label: ${{ needs.start-platform-build-runner.outputs.label }}
ec2-instance-id: ${{ needs.start-platform-build-runner.outputs.ec2-instance-id }}

## Kube Acceptance Tests (with scheduler v2 - both temporal changes and container orchestrator)
## Kube Acceptance Tests
# Docker acceptance tests run as part of the build job.
# In case of self-hosted EC2 errors, remove this block.
start-kube-acceptance-test-runner-v2:
name: "Platform: Start Scheduler V2 Kube Acceptance Test Runner"
start-kube-acceptance-test-runner:
name: "Platform: Start Kube Acceptance Test Runner"
needs:
- changes
- find_valid_pat
Expand All @@ -548,11 +548,11 @@ jobs:
aws-access-key-id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
github-token: ${{ needs.find_valid_pat.outputs.pat }}
kube-acceptance-test-v2:
name: "Platform: Acceptance Tests (Kube v2)"
kube-acceptance-test:
name: "Platform: Acceptance Tests (Kube)"
# In case of self-hosted EC2 errors, removed the `needs` line and switch back to running on ubuntu-latest.
needs: start-kube-acceptance-test-runner-v2 # required to start the main job when the runner is ready
runs-on: ${{ needs.start-kube-acceptance-test-runner-v2.outputs.label }} # run the job on the newly created runner
needs: start-kube-acceptance-test-runner # required to start the main job when the runner is ready
runs-on: ${{ needs.start-kube-acceptance-test-runner.outputs.label }} # run the job on the newly created runner
environment: more-secrets
timeout-minutes: 90
steps:
Expand Down Expand Up @@ -631,17 +631,17 @@ jobs:
name: Kubernetes Logs
path: /tmp/kubernetes_logs/*
# In case of self-hosted EC2 errors, remove this block.
stop-kube-acceptance-test-runner-v2:
stop-kube-acceptance-test-runner:
name: "Platform: Stop Kube Acceptance Test EC2 Runner"
timeout-minutes: 10
needs:
- start-kube-acceptance-test-runner-v2 # required to get output from the start-runner job
- kube-acceptance-test-v2 # required to wait when the main job is done
- start-kube-acceptance-test-runner # required to get output from the start-runner job
- kube-acceptance-test # required to wait when the main job is done
- find_valid_pat
runs-on: ubuntu-latest
# Always is required to stop the runner even if the previous job has errors. However always() runs even if the previous step is skipped.
# Thus, we check for skipped here.
if: ${{ always() && needs.start-kube-acceptance-test-runner-v2.result != 'skipped'}}
if: ${{ always() && needs.start-kube-acceptance-test-runner.result != 'skipped'}}
steps:
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
Expand All @@ -654,8 +654,8 @@ jobs:
with:
mode: stop
github-token: ${{ needs.find_valid_pat.outputs.pat }}
label: ${{ needs.start-kube-acceptance-test-runner-v2.outputs.label }}
ec2-instance-id: ${{ needs.start-kube-acceptance-test-runner-v2.outputs.ec2-instance-id }}
label: ${{ needs.start-kube-acceptance-test-runner.outputs.label }}
ec2-instance-id: ${{ needs.start-kube-acceptance-test-runner.outputs.ec2-instance-id }}

notify-failure-slack-channel:
name: "Notify Slack Channel on Build Failures"
Expand All @@ -665,7 +665,7 @@ jobs:
- frontend-build
- octavia-cli-build
- platform-build
- kube-acceptance-test-v2
- kube-acceptance-test
if: ${{ failure() && github.ref == 'refs/heads/master' }}
steps:
- name: Publish to OSS Build Failure Slack Channel
Expand All @@ -689,7 +689,7 @@ jobs:
- frontend-build
- octavia-cli-build
- platform-build
- kube-acceptance-test-v2
- kube-acceptance-test
if: success()
steps:
- name: Get Previous Workflow Status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ void testBootloaderAppBlankDb() throws Exception {
when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L);

val mockedFeatureFlags = mock(FeatureFlags.class);
when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false);

val mockedSecretMigrator = mock(SecretMigrator.class);

Expand Down Expand Up @@ -157,7 +156,6 @@ void testBootloaderAppRunSecretMigration() throws Exception {
when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L);

val mockedFeatureFlags = mock(FeatureFlags.class);
when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false);

final JsonSecretsProcessor jsonSecretsProcessor = JsonSecretsProcessor.builder()
.copySecrets(true)
Expand Down Expand Up @@ -302,7 +300,6 @@ void testPostLoadExecutionExecutes() throws Exception {
when(mockedConfigs.getJobsDatabaseInitializationTimeoutMs()).thenReturn(60000L);

val mockedFeatureFlags = mock(FeatureFlags.class);
when(mockedFeatureFlags.usesNewScheduler()).thenReturn(false);

val mockedSecretMigrator = mock(SecretMigrator.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,6 @@
@Slf4j
public class EnvVariableFeatureFlags implements FeatureFlags {

@Override
public boolean usesNewScheduler() {
// TODO: sweep this method along with the scheduler
log.info("New Scheduler: true (post-migration)");

// After migrating all OSS users onto the new temporal scheduler, this should always return true.
return true;
}

@Override
public boolean autoDisablesFailingConnections() {
log.info("Auto Disable Failing Connections: " + Boolean.parseBoolean(System.getenv("AUTO_DISABLE_FAILING_CONNECTIONS")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
*/
public interface FeatureFlags {

boolean usesNewScheduler();

boolean autoDisablesFailingConnections();

boolean exposeSecretsInExport();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,18 +449,6 @@ public interface Configs {
*/
Set<Integer> getTemporalWorkerPorts();

// Scheduler
/**
* Define how and how often the Scheduler sweeps its local disk for old configs. Multiple variables
* are involved here. Please see {@link WorkspaceRetentionConfig} for more info.
*/
WorkspaceRetentionConfig getWorkspaceRetentionConfig();

/**
* Define the maximum number of concurrent jobs the Scheduler schedules. Defaults to 5.
*/
String getSubmitterNumThreads();

// Container Orchestrator
/**
* Define if Airbyte should use the container orchestrator. Internal-use only.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ public class EnvConfigs implements Configs {
public static final String SYNC_JOB_MAX_ATTEMPTS = "SYNC_JOB_MAX_ATTEMPTS";
public static final String SYNC_JOB_MAX_TIMEOUT_DAYS = "SYNC_JOB_MAX_TIMEOUT_DAYS";
private static final String CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED = "CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED";
private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB";
public static final String MAX_SPEC_WORKERS = "MAX_SPEC_WORKERS";
public static final String MAX_CHECK_WORKERS = "MAX_CHECK_WORKERS";
public static final String MAX_DISCOVER_WORKERS = "MAX_DISCOVER_WORKERS";
Expand All @@ -81,7 +78,6 @@ public class EnvConfigs implements Configs {
private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS";
private static final String TEMPORAL_HISTORY_RETENTION_IN_DAYS = "TEMPORAL_HISTORY_RETENTION_IN_DAYS";
public static final String JOB_KUBE_NAMESPACE = "JOB_KUBE_NAMESPACE";
private static final String SUBMITTER_NUM_THREADS = "SUBMITTER_NUM_THREADS";
public static final String JOB_MAIN_CONTAINER_CPU_REQUEST = "JOB_MAIN_CONTAINER_CPU_REQUEST";
public static final String JOB_MAIN_CONTAINER_CPU_LIMIT = "JOB_MAIN_CONTAINER_CPU_LIMIT";
public static final String JOB_MAIN_CONTAINER_MEMORY_REQUEST = "JOB_MAIN_CONTAINER_MEMORY_REQUEST";
Expand Down Expand Up @@ -156,9 +152,6 @@ public class EnvConfigs implements Configs {
private static final String DEFAULT_JOB_KUBE_SOCAT_IMAGE = "alpine/socat:1.7.4.3-r0";
private static final String DEFAULT_JOB_KUBE_BUSYBOX_IMAGE = "busybox:1.28";
private static final String DEFAULT_JOB_KUBE_CURL_IMAGE = "curlimages/curl:7.83.1";
private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1;
private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60;
private static final long DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB = 5000;
private static final int DEFAULT_DATABASE_INITIALIZATION_TIMEOUT_MS = 60 * 1000;

public static final long DEFAULT_MAX_SPEC_WORKERS = 5;
Expand Down Expand Up @@ -791,21 +784,6 @@ public Set<Integer> getTemporalWorkerPorts() {
return Arrays.stream(ports.split(",")).map(Integer::valueOf).collect(Collectors.toSet());
}

// Scheduler
@Override
public WorkspaceRetentionConfig getWorkspaceRetentionConfig() {
final long minDays = getEnvOrDefault(MINIMUM_WORKSPACE_RETENTION_DAYS, DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS);
final long maxDays = getEnvOrDefault(MAXIMUM_WORKSPACE_RETENTION_DAYS, DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS);
final long maxSizeMb = getEnvOrDefault(MAXIMUM_WORKSPACE_SIZE_MB, DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB);

return new WorkspaceRetentionConfig(minDays, maxDays, maxSizeMb);
}

@Override
public String getSubmitterNumThreads() {
return getEnvOrDefault(SUBMITTER_NUM_THREADS, "5");
}

@Override
public boolean getContainerOrchestratorEnabled() {
return getEnvOrDefault(CONTAINER_ORCHESTRATOR_ENABLED, false, Boolean::valueOf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
public enum MetricEmittingApps implements MetricEmittingApp {

METRICS_REPORTER("metrics-reporter"),
SCHEDULER("scheduler"),
WORKER("worker");

private String applicationName;
Expand Down
15 changes: 0 additions & 15 deletions airbyte-scheduler/app/Dockerfile

This file was deleted.

72 changes: 0 additions & 72 deletions airbyte-scheduler/app/build.gradle

This file was deleted.

7 changes: 0 additions & 7 deletions airbyte-scheduler/app/readme.md

This file was deleted.

This file was deleted.

Loading