diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 62064bfe32c7a..e9b9fdceea8bb 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -153,6 +153,7 @@ public void start() throws IOException { () -> { MDC.setContextMap(mdc); jobCleaner.run(); + jobPersistence.purgeJobHistory(); }, CLEANING_DELAY.toSeconds(), CLEANING_DELAY.toSeconds(), diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 70505593f0caa..23d9d4ef3dafe 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -31,6 +31,7 @@ import com.google.common.collect.Sets; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.text.Names; import io.airbyte.commons.text.Sqls; import io.airbyte.commons.version.AirbyteVersion; @@ -82,6 +83,11 @@ public class DefaultJobPersistence implements JobPersistence { + // not static because job history test case manipulates these. + private final int JOB_HISTORY_MINIMUM_AGE_IN_DAYS; + private final int JOB_HISTORY_MINIMUM_RECENCY; + private final int JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS; + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultJobPersistence.class); private static final Set SYSTEM_SCHEMA = Set .of("pg_toast", "information_schema", "pg_catalog", "import_backup", "pg_internal", @@ -119,13 +125,20 @@ public class DefaultJobPersistence implements JobPersistence { private final Supplier timeSupplier; @VisibleForTesting - DefaultJobPersistence(Database database, Supplier timeSupplier) { + DefaultJobPersistence(Database database, + Supplier timeSupplier, + int minimumAgeInDays, + int excessiveNumberOfJobs, + int minimumRecencyCount) { this.database = new ExceptionWrappingDatabase(database); this.timeSupplier = timeSupplier; + JOB_HISTORY_MINIMUM_AGE_IN_DAYS = minimumAgeInDays; + JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = excessiveNumberOfJobs; + JOB_HISTORY_MINIMUM_RECENCY = minimumRecencyCount; } public DefaultJobPersistence(Database database) { - this(database, Instant::now); + this(database, Instant::now, 30, 500, 10); } @Override @@ -506,6 +519,26 @@ private List listTables(final String schema) throws IOException { } } + @Override + public void purgeJobHistory() { + purgeJobHistory(LocalDateTime.now()); + } + + @VisibleForTesting + public void purgeJobHistory(LocalDateTime asOfDate) { + try { + String JOB_HISTORY_PURGE_SQL = MoreResources.readResource("job_history_purge.sql"); + // interval '?' days cannot use a ? bind, so we're using %d instead. + String sql = String.format(JOB_HISTORY_PURGE_SQL, (JOB_HISTORY_MINIMUM_AGE_IN_DAYS - 1)); + final Integer rows = database.query(ctx -> ctx.execute(sql, + asOfDate.format(DateTimeFormatter.ofPattern("YYYY-MM-dd")), + JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS, + JOB_HISTORY_MINIMUM_RECENCY)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private List listAllTables(final String schema) throws IOException { if (schema != null) { return database.query(context -> context.meta().getSchemas(schema).stream() diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java index 068c871e1ab3f..73ff78050d53c 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java @@ -184,4 +184,11 @@ public interface JobPersistence { */ void importDatabase(String airbyteVersion, Map> data) throws IOException; + /** + * Purges job history while ensuring that the latest saved-state information is maintained. + * + * @throws IOException + */ + void purgeJobHistory(); + } diff --git a/airbyte-scheduler/persistence/src/main/resources/job_history_purge.sql b/airbyte-scheduler/persistence/src/main/resources/job_history_purge.sql new file mode 100644 index 0000000000000..931634f0c4b2f --- /dev/null +++ b/airbyte-scheduler/persistence/src/main/resources/job_history_purge.sql @@ -0,0 +1,100 @@ +DELETE +FROM + jobs +WHERE + jobs.id IN( + SELECT + jobs.id + FROM + jobs + LEFT JOIN( + SELECT + SCOPE, + COUNT( jobs.id ) AS jobCount + FROM + jobs + GROUP BY + SCOPE + ) counts ON + jobs.scope = counts.scope + WHERE + -- job must be at least MINIMUM_AGE_IN_DAYS old or connection has more than EXCESSIVE_NUMBER_OF_JOBS +( + jobs.created_at <( + TO_TIMESTAMP( + ?, + 'YYYY-MM-DD' + )- INTERVAL '%d' DAY + ) + OR counts.jobCount >? + ) + AND jobs.id NOT IN( + -- cannot be the most recent job with saved state + SELECT + job_id AS latest_job_id_with_state + FROM + ( + SELECT + jobs.scope, + jobs.id AS job_id, + jobs.config_type, + jobs.created_at, + jobs.status, + bool_or( + attempts."output" -> 'sync' -> 'state' -> 'state' IS NOT NULL + ) AS outputStateExists, + ROW_NUMBER() OVER( + PARTITION BY SCOPE + ORDER BY + jobs.created_at DESC, + jobs.id DESC + ) AS stateRecency + FROM + jobs + LEFT JOIN attempts ON + jobs.id = attempts.job_id + GROUP BY + SCOPE, + jobs.id + HAVING + bool_or( + attempts."output" -> 'sync' -> 'state' -> 'state' IS NOT NULL + )= TRUE + ORDER BY + SCOPE, + jobs.created_at DESC, + jobs.id DESC + ) jobs_with_state + WHERE + stateRecency = 1 + ) + AND jobs.id NOT IN( + -- cannot be one of the last MINIMUM_RECENCY jobs for that connection/scope + SELECT + id + FROM + ( + SELECT + jobs.scope, + jobs.id, + jobs.created_at, + ROW_NUMBER() OVER( + PARTITION BY SCOPE + ORDER BY + jobs.created_at DESC, + jobs.id DESC + ) AS recency + FROM + jobs + GROUP BY + SCOPE, + jobs.id + ORDER BY + SCOPE, + jobs.created_at DESC, + jobs.id DESC + ) jobs_by_recency + WHERE + recency <=? + ) + ) diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index df1637de96ded..e38a97c8bac85 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -38,6 +38,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.text.Sqls; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobGetSpecConfig; @@ -58,6 +59,8 @@ import java.nio.file.Path; import java.sql.SQLException; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -79,6 +82,8 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.MountableFile; @@ -163,7 +168,7 @@ public void setup() throws Exception { timeSupplier = mock(Supplier.class); when(timeSupplier.get()).thenReturn(NOW); - jobPersistence = new DefaultJobPersistence(database, timeSupplier); + jobPersistence = new DefaultJobPersistence(database, timeSupplier, 30, 500, 10); } @AfterEach @@ -1004,4 +1009,176 @@ void testResetJobCancelled() throws IOException { } + @Nested + @DisplayName("When purging job history") + class PurgeJobHistory { + + private Job persistJobForJobHistoryTesting(String scope, JobConfig jobConfig, JobStatus status, LocalDateTime runDate) + throws IOException, SQLException { + String when = runDate.toString(); + Optional id = database.query( + ctx -> ctx.fetch( + "INSERT INTO jobs(config_type, scope, created_at, updated_at, status, config) " + + "SELECT CAST(? AS JOB_CONFIG_TYPE), ?, ?, ?, CAST(? AS JOB_STATUS), CAST(? as JSONB) " + + "RETURNING id ", + Sqls.toSqlName(jobConfig.getConfigType()), + scope, + runDate, + runDate, + Sqls.toSqlName(status), + Jsons.serialize(jobConfig))) + .stream() + .findFirst() + .map(r -> r.getValue("id", Long.class)); + return jobPersistence.getJob(id.get()); + } + + private void persistAttemptForJobHistoryTesting(Job job, String logPath, LocalDateTime runDate, boolean shouldHaveState) + throws IOException, SQLException { + String attemptOutputWithState = "{\n" + + " \"sync\": {\n" + + " \"state\": {\n" + + " \"state\": {\n" + + " \"bookmarks\": {" + + "}}}}}"; + String attemptOutputWithoutState = "{\n" + + " \"sync\": {\n" + + " \"output_catalog\": {" + + "}}}"; + Integer attemptNumber = database.query(ctx -> ctx.fetch( + "INSERT INTO attempts(job_id, attempt_number, log_path, status, created_at, updated_at, output) " + + "VALUES(?, ?, ?, CAST(? AS ATTEMPT_STATUS), ?, ?, CAST(? as JSONB)) RETURNING attempt_number", + job.getId(), + job.getAttemptsCount(), + logPath, + Sqls.toSqlName(AttemptStatus.FAILED), + runDate, + runDate, + shouldHaveState ? attemptOutputWithState : attemptOutputWithoutState) + .stream() + .findFirst() + .map(r -> r.get("attempt_number", Integer.class)) + .orElseThrow(() -> new RuntimeException("This should not happen"))); + } + + /** + * Testing job history deletion is sensitive to exactly how the constants are configured for + * controlling deletion logic. Thus, the test case injects overrides for those constants, testing a + * comprehensive set of combinations to make sure that the logic is robust to reasonable + * configurations. Extreme configurations such as zero-day retention period are not covered. + * + * Business rules for deletions. 1. Job must be older than X days or its conn has excessive number + * of jobs 2. Job cannot be one of the last N jobs on that conn (last N jobs are always kept). 3. + * Job cannot be holding the most recent saved state (most recent saved state is always kept). + * + * Testing Goal: Set up jobs according to the parameters passed in. Then delete according to the + * rules, and make sure the right number of jobs are left. Against one connection/scope, + *
    + *
  1. Setup: create a history of jobs that goes back many days (but produces no more than one job a + * day)
  2. + *
  3. Setup: the most recent job with state in it should be at least N jobs back
  4. + *
  5. Assert: ensure that after purging, there are the right number of jobs left (and at least min + * recency), including the one with the most recent state.
  6. + *
  7. Assert: ensure that after purging, there are the right number of jobs left (and at least min + * recency), including the X most recent
  8. + *
  9. Assert: ensure that after purging, all other job history has been deleted.
  10. + *
+ * + * @param numJobs How many test jobs to generate; make this enough that all other parameters are + * fully included, for predictable results. + * @param tooManyJobs Takes the place of DefaultJobPersistence.JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS + * - how many jobs are needed before it ignores date-based age of job when doing deletions. + * @param ageCutoff Takes the place of DefaultJobPersistence.JOB_HISTORY_MINIMUM_AGE_IN_DAYS - + * retention period in days for the most recent jobs; older than this gets deleted. + * @param recencyCutoff Takes the place of DefaultJobPersistence.JOB_HISTORY_MINIMUM_RECENCY - + * retention period in number of jobs; at least this many jobs will be retained after + * deletion (provided enough existed in the first place). + * @param lastStatePosition How far back in the list is the job with the latest saved state. This + * can be manipulated to have the saved-state job inside or prior to the retention period. + * @param expectedAfterPurge How many matching jobs are expected after deletion, given the input + * parameters. This was calculated by a human based on understanding the requirements. + * @param goalOfTestScenario Description of the purpose of that test scenario, so it's easier to + * maintain and understand failures. + * + */ + @DisplayName("Should purge older job history but maintain certain more recent ones") + @ParameterizedTest + // Cols: numJobs, tooManyJobsCutoff, ageCutoff, recencyCutoff, lastSavedStatePosition, + // expectedAfterPurge, description + @CsvSource({ + "50,100,10,5,9,10,'Validate age cutoff alone'", + "50,100,10,5,13,11,'Validate saved state after age cutoff'", + "50,100,10,15,9,15,'Validate recency cutoff alone'", + "50,100,10,15,17,16,'Validate saved state after recency cutoff'", + "50,20,30,10,9,10,'Validate excess jobs cutoff alone'", + "50,20,30,10,25,11,'Validate saved state after excess jobs cutoff'", + "50,20,30,20,9,20,'Validate recency cutoff with excess jobs cutoff'", + "50,20,30,20,25,21,'Validate saved state after recency and excess jobs cutoff but before age'", + "50,20,30,20,35,21,'Validate saved state after recency and excess jobs cutoff and after age'" + }) + void testPurgeJobHistory(int numJobs, + int tooManyJobs, + int ageCutoff, + int recencyCutoff, + int lastStatePosition, + int expectedAfterPurge, + String goalOfTestScenario) + throws IOException, SQLException { + final String CURRENT_SCOPE = UUID.randomUUID().toString(); + + // Decoys - these jobs will help mess up bad sql queries, even though they shouldn't be deleted. + final String DECOY_SCOPE = UUID.randomUUID().toString(); + + // Reconfigure constants to test various combinations of tuning knobs and make sure all work. + DefaultJobPersistence jobPersistence = new DefaultJobPersistence(database, timeSupplier, ageCutoff, tooManyJobs, recencyCutoff); + + LocalDateTime fakeNow = LocalDateTime.of(2021, 6, 20, 0, 0); + + // Jobs are created in reverse chronological order; id order is the inverse of old-to-new date + // order. + // The most-recent job is in allJobs[0] which means keeping the 10 most recent is [0-9], simplifying + // testing math as we don't have to care how many jobs total existed and were deleted. + List allJobs = new ArrayList<>(); + List decoyJobs = new ArrayList<>(); + for (int i = 0; i < numJobs; i++) { + allJobs.add(persistJobForJobHistoryTesting(CURRENT_SCOPE, SYNC_JOB_CONFIG, JobStatus.FAILED, fakeNow.minusDays(i))); + decoyJobs.add(persistJobForJobHistoryTesting(DECOY_SCOPE, SYNC_JOB_CONFIG, JobStatus.FAILED, fakeNow.minusDays(i))); + } + + // At least one job should have state. Find the desired job and add state to it. + Job lastJobWithState = addStateToJob(allJobs.get(lastStatePosition)); + addStateToJob(decoyJobs.get(lastStatePosition - 1)); + addStateToJob(decoyJobs.get(lastStatePosition + 1)); + + // An older job with state should also exist, so we ensure we picked the most-recent with queries. + Job olderJobWithState = addStateToJob(allJobs.get(lastStatePosition + 1)); + + // sanity check that the attempt does have saved state so the purge history sql detects it correctly + assertTrue(lastJobWithState.getAttempts().get(0).getOutput() != null, + goalOfTestScenario + " - missing saved state on job that was supposed to have it."); + + // Execute the job history purge and check what jobs are left. + ((DefaultJobPersistence) jobPersistence).purgeJobHistory(fakeNow); + List afterPurge = jobPersistence.listJobs(ConfigType.SYNC, CURRENT_SCOPE, 9999, 0); + + // Test - contains expected number of jobs and no more than that + assertEquals(expectedAfterPurge, afterPurge.size(), goalOfTestScenario + " - Incorrect number of jobs remain after deletion."); + + // Test - most-recent are actually the most recent by date (see above, reverse order) + for (int i = 0; i < Math.min(ageCutoff, recencyCutoff); i++) { + assertEquals(allJobs.get(i).getId(), afterPurge.get(i).getId(), goalOfTestScenario + " - Incorrect sort order after deletion."); + } + + // Test - job with latest state is always kept despite being older than some cutoffs + assertTrue(afterPurge.contains(lastJobWithState), goalOfTestScenario + " - Missing last job with saved state after deletion."); + } + + private Job addStateToJob(Job job) throws IOException, SQLException { + persistAttemptForJobHistoryTesting(job, LOG_PATH.toString(), + LocalDateTime.ofEpochSecond(job.getCreatedAtInSecond(), 0, ZoneOffset.UTC), true); + return jobPersistence.getJob(job.getId()); // reload job to include its attempts + } + + } + }