Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Define DbRetentionJob(Jdbi, DbRetentionConfig) #2549

Merged
merged 6 commits into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,7 @@ public void run(@NonNull MarquezConfig config, @NonNull Environment env) {
// Add scheduled jobs to lifecycle.
if (config.hasDbRetentionPolicy()) {
// Add job to apply retention policy to database.
env.lifecycle()
.manage(
new DbRetentionJob(
jdbi,
config.getDbRetention().getFrequencyMins(),
config.getDbRetention().getNumberOfRowsPerBatch(),
config.getDbRetention().getRetentionDays()));
env.lifecycle().manage(new DbRetentionJob(jdbi, config.getDbRetention()));
}
}

Expand Down
18 changes: 13 additions & 5 deletions api/src/main/java/marquez/jobs/DbRetentionConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,22 @@
import static marquez.db.DbRetention.DEFAULT_NUMBER_OF_ROWS_PER_BATCH;
import static marquez.db.DbRetention.DEFAULT_RETENTION_DAYS;

import javax.validation.constraints.Positive;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.NoArgsConstructor;
import lombok.Value;

/** Configuration for {@link DbRetentionJob}. */
public final class DbRetentionConfig {
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Value
public class DbRetentionConfig {
public static final int DEFAULT_FREQUENCY_MINS = 15;

@Getter @Setter private int frequencyMins = DEFAULT_FREQUENCY_MINS;
@Getter @Setter private int numberOfRowsPerBatch = DEFAULT_NUMBER_OF_ROWS_PER_BATCH;
@Getter @Setter private int retentionDays = DEFAULT_RETENTION_DAYS;
@Builder.Default @Getter @Positive int frequencyMins = DEFAULT_FREQUENCY_MINS;
@Builder.Default @Getter @Positive int numberOfRowsPerBatch = DEFAULT_NUMBER_OF_ROWS_PER_BATCH;
@Builder.Default @Getter @Positive int retentionDays = DEFAULT_RETENTION_DAYS;
}
18 changes: 6 additions & 12 deletions api/src/main/java/marquez/jobs/DbRetentionJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package marquez.jobs;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.util.concurrent.AbstractScheduledService;
import io.dropwizard.lifecycle.Managed;
import java.time.Duration;
Expand Down Expand Up @@ -43,21 +41,17 @@ public class DbRetentionJob extends AbstractScheduledService implements Managed
* of {@code retentionDays}.
*/
public DbRetentionJob(
@NonNull final Jdbi jdbi,
final int frequencyMins,
final int numberOfRowsPerBatch,
final int retentionDays) {
checkArgument(frequencyMins > 0, "'frequencyMins' must be > 0");
checkArgument(numberOfRowsPerBatch > 0, "'numberOfRowsPerBatch' must be > 0");
checkArgument(retentionDays > 0, "'retentionDays' must be > 0");
this.numberOfRowsPerBatch = numberOfRowsPerBatch;
this.retentionDays = retentionDays;
@NonNull final Jdbi jdbi, @NonNull final DbRetentionConfig dbRetentionConfig) {
this.numberOfRowsPerBatch = dbRetentionConfig.getNumberOfRowsPerBatch();
this.retentionDays = dbRetentionConfig.getRetentionDays();

// Open connection.
this.jdbi = jdbi;

// Define fixed schedule with no delay.
this.fixedRateScheduler =
Scheduler.newFixedRateSchedule(NO_DELAY, Duration.ofMinutes(frequencyMins));
Scheduler.newFixedRateSchedule(
NO_DELAY, Duration.ofMinutes(dbRetentionConfig.getFrequencyMins()));
}

@Override
Expand Down
125 changes: 125 additions & 0 deletions api/src/test/java/marquez/jobs/DbRetentionConfigTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.jobs;

import static marquez.db.DbRetention.DEFAULT_NUMBER_OF_ROWS_PER_BATCH;
import static marquez.db.DbRetention.DEFAULT_RETENTION_DAYS;
import static marquez.jobs.DbRetentionConfig.DEFAULT_FREQUENCY_MINS;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Set;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.Validator;
import org.junit.jupiter.api.Test;

/** The test suite for {@link DbRetentionConfig}. */
public class DbRetentionConfigTest {
private static final Validator VALIDATOR =
Validation.buildDefaultValidatorFactory().getValidator();

@Test
public void testNewDbRetentionConfig_withDefaultsOnly() {
final DbRetentionConfig configWithDefaults = new DbRetentionConfig();

assertThat(configWithDefaults.getFrequencyMins()).isEqualTo(DEFAULT_FREQUENCY_MINS);
assertThat(configWithDefaults.getNumberOfRowsPerBatch())
.isEqualTo(DEFAULT_NUMBER_OF_ROWS_PER_BATCH);
assertThat(configWithDefaults.getRetentionDays()).isEqualTo(DEFAULT_RETENTION_DAYS);
}

@Test
public void testNewDbRetentionConfig_overrideFrequencyMins() {
final int frequencyMinsOverride = 5;
final DbRetentionConfig configWithFrequencyMinsOverride =
DbRetentionConfig.builder().frequencyMins(frequencyMinsOverride).build();

// No constraint violations.
final Set<ConstraintViolation<DbRetentionConfig>> violations =
VALIDATOR.validate(configWithFrequencyMinsOverride);
assertThat(violations).isEmpty();

assertThat(configWithFrequencyMinsOverride.getFrequencyMins()).isEqualTo(frequencyMinsOverride);
assertThat(configWithFrequencyMinsOverride.getNumberOfRowsPerBatch())
.isEqualTo(DEFAULT_NUMBER_OF_ROWS_PER_BATCH);
assertThat(configWithFrequencyMinsOverride.getRetentionDays())
.isEqualTo(DEFAULT_RETENTION_DAYS);
}

@Test
public void testNewDbRetentionConfig_overrideNumberOfRowsPerBatch() {
final int numberOfRowsPerBatchOverride = 25;
final DbRetentionConfig configWithNumberOfRowsPerBatchOverride =
DbRetentionConfig.builder().numberOfRowsPerBatch(numberOfRowsPerBatchOverride).build();

// No constraint violations.
final Set<ConstraintViolation<DbRetentionConfig>> violations =
VALIDATOR.validate(configWithNumberOfRowsPerBatchOverride);
assertThat(violations).isEmpty();

assertThat(configWithNumberOfRowsPerBatchOverride.getFrequencyMins())
.isEqualTo(DEFAULT_FREQUENCY_MINS);
assertThat(configWithNumberOfRowsPerBatchOverride.getNumberOfRowsPerBatch())
.isEqualTo(numberOfRowsPerBatchOverride);
assertThat(configWithNumberOfRowsPerBatchOverride.getRetentionDays())
.isEqualTo(DEFAULT_RETENTION_DAYS);
}

@Test
public void testNewDbRetentionConfig_overrideRetentionDays() {
final int retentionDaysOverride = 14;
final DbRetentionConfig configWithNumberOfRowsPerBatchOverride =
DbRetentionConfig.builder().retentionDays(retentionDaysOverride).build();

// No constraint violations.
final Set<ConstraintViolation<DbRetentionConfig>> violations =
VALIDATOR.validate(configWithNumberOfRowsPerBatchOverride);
assertThat(violations).isEmpty();

assertThat(configWithNumberOfRowsPerBatchOverride.getFrequencyMins())
.isEqualTo(DEFAULT_FREQUENCY_MINS);
assertThat(configWithNumberOfRowsPerBatchOverride.getNumberOfRowsPerBatch())
.isEqualTo(DEFAULT_NUMBER_OF_ROWS_PER_BATCH);
assertThat(configWithNumberOfRowsPerBatchOverride.getRetentionDays())
.isEqualTo(retentionDaysOverride);
}

@Test
public void testNewDbRetentionConfig_negativeFrequencyMins() {
final int negativeFrequencyMins = -5;

final DbRetentionConfig configWithNegativeFrequencyMins =
DbRetentionConfig.builder().frequencyMins(negativeFrequencyMins).build();

final Set<ConstraintViolation<DbRetentionConfig>> violations =
VALIDATOR.validate(configWithNegativeFrequencyMins);
assertThat(violations).hasSize(1);
}

@Test
public void testNewDbRetentionConfig_negativeNumberOfRowsPerBatch() {
final int negativeNumberOfRowsPerBatch = -25;

final DbRetentionConfig configWithNegativeNumberOfRowsPerBatch =
DbRetentionConfig.builder().numberOfRowsPerBatch(negativeNumberOfRowsPerBatch).build();

final Set<ConstraintViolation<DbRetentionConfig>> violations =
VALIDATOR.validate(configWithNegativeNumberOfRowsPerBatch);
assertThat(violations).hasSize(1);
}

@Test
public void testNewDbRetentionConfig_negativeRetentionDays() {
final int negativeRetentionDays = -14;

final DbRetentionConfig configWithNegativeRetentionDays =
DbRetentionConfig.builder().retentionDays(negativeRetentionDays).build();

final Set<ConstraintViolation<DbRetentionConfig>> violations =
VALIDATOR.validate(configWithNegativeRetentionDays);
assertThat(violations).hasSize(1);
}
}