Skip to content

Commit

Permalink
Adding time based filter to exclude recently created tables from jobs (
Browse files Browse the repository at this point in the history
…#163)

## Summary

<!--- HINT: Replace #nnn with corresponding Issue number, if you are
fixing an existing issue -->

[Issue](https://github.com/linkedin/openhouse/issues/#nnn)] Briefly
discuss the summary of the changes made in this
pull request in 2-3 lines.
JobsScheduler considers tables which are recently created also for
running maintenance jobs. This has two concerns:
1. Newly created tables seldom require maintenance hence launching jobs
for them is un-neccesary
2. Some accounts have table creation process followed by immediate
deletion lead to race conditions where jobs scheduler `gets` a table but
during spark job execution, the table may be deleted leading to job
failures. Testing flows from downstream systems is one example which can
lead to this case.

Solution:
Only consider tables which have spent significant time in Openhouse
catalog.

Implementation:
Added integer `timeFilter` option in JobsScheduler cmd options which
translates to hours before which a table should be created to be
considered for Job runs.

## Changes

- [ ] Client-facing API Changes
- [ ] Internal API Changes
- [x] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [ ] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
  • Loading branch information
rohitkum2506 authored Aug 20, 2024
1 parent 03d84db commit 24b3382
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.AllArgsConstructor;
Expand All @@ -38,6 +39,7 @@
@AllArgsConstructor
public class TablesClient {
private static final int REQUEST_TIMEOUT_SECONDS = 60;
private final int cutOffHours;
private final RetryTemplate retryTemplate;
private final TableApi tableApi;
private final DatabaseApi databaseApi;
Expand Down Expand Up @@ -98,6 +100,7 @@ protected GetTableResponseBody getTable(TableMetadata tableMetadata) {
public boolean canRunDataLayoutStrategyGeneration(TableMetadata tableMetadata) {
GetTableResponseBody response = getTable(tableMetadata);
return response != null
&& checkCreationTimeEligibility(response)
&& isPrimaryTable(response)
&& (response.getTimePartitioning() != null || response.getClustering() != null);
}
Expand All @@ -109,7 +112,8 @@ && isPrimaryTable(response)
* @return true if the table can run data compaction, false otherwise
*/
public boolean canRunDataCompaction(TableMetadata tableMetadata) {
return isPrimaryTable(tableMetadata);
GetTableResponseBody response = getTable(tableMetadata);
return response != null && checkCreationTimeEligibility(response) && isPrimaryTable(response);
}

/**
Expand All @@ -119,7 +123,8 @@ public boolean canRunDataCompaction(TableMetadata tableMetadata) {
* @return true if the table can expire snapshots, false otherwise
*/
public boolean canExpireSnapshots(TableMetadata tableMetadata) {
return isPrimaryTable(tableMetadata);
GetTableResponseBody response = getTable(tableMetadata);
return response != null && checkCreationTimeEligibility(response) && isPrimaryTable(response);
}

/**
Expand All @@ -131,20 +136,46 @@ public boolean canExpireSnapshots(TableMetadata tableMetadata) {
public boolean canRunRetention(TableMetadata tableMetadata) {
GetTableResponseBody response = getTable(tableMetadata);

if (response == null || !isPrimaryTable(response)) {
if (response == null || !checkCreationTimeEligibility(response) || !isPrimaryTable(response)) {
return false;
}
Optional<RetentionConfig> config = getTableRetention(response);
return config.isPresent();
}

private boolean isPrimaryTable(@NonNull GetTableResponseBody response) {
return GetTableResponseBody.TableTypeEnum.PRIMARY_TABLE == response.getTableType();
/**
* Checks if staged deletion task can be run on given table
*
* @param tableMetadata
* @return
*/
public boolean canRunStagedDataDeletion(@NonNull TableMetadata tableMetadata) {
GetTableResponseBody response = getTable(tableMetadata);
return response != null && checkCreationTimeEligibility(response);
}

private boolean isPrimaryTable(@NonNull TableMetadata tableMetadata) {
/**
* Checks if orphan files deletion task can be run on given table
*
* @param tableMetadata
* @return
*/
public boolean canRunOrphanFileDeletion(@NonNull TableMetadata tableMetadata) {
GetTableResponseBody response = getTable(tableMetadata);
return response != null && isPrimaryTable(response);
return response != null && checkCreationTimeEligibility(response);
}

private boolean checkCreationTimeEligibility(@NonNull GetTableResponseBody response) {
if (response.getCreationTime() == null) {
// no creationTime assigned to table, by default we pick these tables for maintenance
return true;
}
return response.getCreationTime()
< System.currentTimeMillis() - TimeUnit.HOURS.toMillis(cutOffHours);
}

private boolean isPrimaryTable(@NonNull GetTableResponseBody response) {
return GetTableResponseBody.TableTypeEnum.PRIMARY_TABLE == response.getTableType();
}

public List<TableMetadata> getTables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
public class TablesClientFactory {
private final String basePath;
private final DatabaseTableFilter filter;
private final int cutOffHours;
private final @Nullable String token;
protected final FsStorageProvider fsStorageProvider;

Expand All @@ -42,12 +43,12 @@ private TablesClient create(RetryTemplate retryTemplate) {

public TablesClient create(RetryTemplate retryTemplate, TableApi tableApi, DatabaseApi dbApi) {
return new TablesClient(
retryTemplate, tableApi, dbApi, filter, new StorageClient(fsStorageProvider));
retryTemplate, tableApi, dbApi, filter, cutOffHours, new StorageClient(fsStorageProvider));
}

@VisibleForTesting
public TablesClient create(
RetryTemplate retryTemplate, TableApi tableApi, DatabaseApi dbApi, StorageClient client) {
return new TablesClient(retryTemplate, tableApi, dbApi, filter, client);
return new TablesClient(retryTemplate, tableApi, dbApi, filter, cutOffHours, client);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
public class JobsScheduler {
private static final int TASKS_WAIT_TIMEOUT_HOURS = 12;
private static final int DEFAULT_MAX_NUM_CONCURRENT_JOBS = 40;
private static final int DEFAULT_TABLE_CREATION_CUTOFF_HOUR = 72;
private static final Map<String, Class<? extends OperationTask>> OPERATIONS_REGISTRY =
new HashMap<>();
private static final Meter METER = OtelConfig.getMeter(JobsScheduler.class.getName());
Expand Down Expand Up @@ -275,6 +276,13 @@ protected static CommandLine parseArgs(String[] args) {
.longOpt("tableFilter")
.desc("Regexp for filtering tables, defaults to .*")
.build());
options.addOption(
Option.builder(null)
.required(false)
.hasArg()
.longOpt("cutoffHours")
.desc("Time in hour for filtering older tables, defaults to 72")
.build());
options.addOption(
Option.builder(null)
.required(false)
Expand Down Expand Up @@ -355,9 +363,13 @@ protected static TablesClientFactory getTablesClientFactory(CommandLine cmdLine)
cmdLine.getOptionValue("storageType", null),
cmdLine.getOptionValue("storageUri", null),
cmdLine.getOptionValue("rootPath", null));
int cutoffHours =
Integer.parseInt(
cmdLine.getOptionValue(
"cutoffHours", String.valueOf(DEFAULT_TABLE_CREATION_CUTOFF_HOUR)));

return new TablesClientFactory(
cmdLine.getOptionValue("tablesURL"), filter, token, hdfsStorageProvider);
cmdLine.getOptionValue("tablesURL"), filter, cutoffHours, token, hdfsStorageProvider);
}

protected static JobsClientFactory getJobsClientFactory(CommandLine cmdLine) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ protected List<String> getArgs() {

@Override
protected boolean shouldRun() {
return true;
return tablesClient.canRunOrphanFileDeletion(getMetadata());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ protected List<String> getArgs() {

@Override
protected boolean shouldRun() {
return true;
return tablesClient.canRunStagedDataDeletion(getMetadata());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.fs.Path;
Expand All @@ -50,6 +51,8 @@ public class TablesClientTest {
private final String testTableCreator = "test_table_creator";
private final String testOrphanDirectoryName = "test_orphan_directory_name";
private final String testTableNamePartitioned = "test_table_name_partitioned";
private final String testTableNameOlder = "test_table_name_older";
private final String testTableNameNewer = "test_table_name_newer";
private final String testTableNameClustered = "test_table_name_clustered";
private final String testPartitionColumnName = "test_partition_column_name";
private final String testReplicaTableName = "test_replica_table_name";
Expand All @@ -61,6 +64,7 @@ public class TablesClientTest {
new TablesClientFactory(
"base_path",
DatabaseTableFilter.of(".*", ".*"),
1,
null,
ParameterizedHdfsStorageProvider.of("hadoop", "hdfs://localhost/", "/jobs/openhouse/"));
private TableApi apiMock;
Expand Down Expand Up @@ -114,7 +118,6 @@ void testGetTables() {
(Mono<GetTableResponseBody>) Mockito.mock(Mono.class);
Mono<GetTableResponseBody> partitionedTableResponseMock =
(Mono<GetTableResponseBody>) Mockito.mock(Mono.class);

Mockito.when(responseMock.block(any(Duration.class))).thenReturn(allTablesResponseBodyMock);
Mockito.when(dbResponseMock.block(any(Duration.class)))
.thenReturn(allDatabasesResponseBodyMock);
Expand All @@ -128,12 +131,15 @@ void testGetTables() {
.thenReturn(unPartitionedTableResponseMock);
Mockito.when(apiMock.getTableV1(testDbName, testTableName))
.thenReturn(partitionedTableResponseMock);

List<TableMetadata> tableMetadataList = client.getTables();
Assertions.assertEquals(
Arrays.asList(
TableMetadata.builder().dbName(testDbName).tableName(testTableName).build(),
TableMetadata.builder().dbName(testDbName).tableName(testTableNamePartitioned).build()),
client.getTables());
tableMetadataList);
for (TableMetadata tableMetadata : tableMetadataList) {
Assertions.assertFalse(tableMetadata.getTableName().contains(testTableNameOlder));
}
Mockito.verify(unPartitionedTableResponseMock, Mockito.times(1)).block(any(Duration.class));
Mockito.verify(partitionedTableResponseMock, Mockito.times(1)).block(any(Duration.class));
Mockito.verify(responseMock, Mockito.times(1)).block(any(Duration.class));
Expand Down Expand Up @@ -274,6 +280,36 @@ void testCanExpireSnapshots() {
TableMetadata.builder().dbName(testDbName).tableName(testReplicaTableName).build()));
}

@Test
void testCanRunOrphanFilesDeletion() {
GetTableResponseBody olderTableResponseBodyMock =
createTableResponseBodyMockWithCreationTime(
testDbName, testTableNameOlder, testPartitionColumnName, testRetentionTTLDays, 2);
Mono<GetTableResponseBody> olderTableResponseMock =
(Mono<GetTableResponseBody>) Mockito.mock(Mono.class);
Mockito.when(olderTableResponseMock.block(any(Duration.class)))
.thenReturn(olderTableResponseBodyMock);
Mockito.when(apiMock.getTableV1(testDbName, testTableNameOlder))
.thenReturn(olderTableResponseMock);

GetTableResponseBody newerTableResponseBodyMock =
createTableResponseBodyMockWithCreationTime(
testDbName, testTableNameNewer, testPartitionColumnName, testRetentionTTLDays, 0);
Mono<GetTableResponseBody> newerTableResponseMock =
(Mono<GetTableResponseBody>) Mockito.mock(Mono.class);
Mockito.when(newerTableResponseMock.block(any(Duration.class)))
.thenReturn(newerTableResponseBodyMock);
Mockito.when(apiMock.getTableV1(testDbName, testTableNameNewer))
.thenReturn(newerTableResponseMock);

Assertions.assertTrue(
client.canRunOrphanFileDeletion(
TableMetadata.builder().dbName(testDbName).tableName(testTableNameOlder).build()));
Assertions.assertFalse(
client.canRunOrphanFileDeletion(
TableMetadata.builder().dbName(testDbName).tableName(testTableNameNewer).build()));
}

@Test
void testCanRunRetention() {
GetTableResponseBody partitionedTableResponseBodyMock =
Expand All @@ -293,6 +329,20 @@ void testCanRunRetention() {
.tableName(testTableNamePartitioned)
.build()));

GetTableResponseBody olderTableResponseBodyMock =
createTableResponseBodyMockWithCreationTime(
testDbName, testTableNameOlder, testPartitionColumnName, testRetentionTTLDays, 2);
Mono<GetTableResponseBody> olderTableResponseMock =
(Mono<GetTableResponseBody>) Mockito.mock(Mono.class);
Mockito.when(olderTableResponseMock.block(any(Duration.class)))
.thenReturn(olderTableResponseBodyMock);
Mockito.when(apiMock.getTableV1(testDbName, testTableNameOlder))
.thenReturn(olderTableResponseMock);
// Retention skipped for a table recently created despite retention config being set.
Assertions.assertTrue(
client.canRunRetention(
TableMetadata.builder().dbName(testDbName).tableName(testTableNameOlder).build()));

GetTableResponseBody primaryTableResponseBodyMock =
createUnpartitionedTableResponseBodyMock(testDbName, testTableName);
Mono<GetTableResponseBody> responseMock = (Mono<GetTableResponseBody>) Mockito.mock(Mono.class);
Expand All @@ -319,6 +369,20 @@ void testCanRunRetention() {
.dbName(testDbName)
.tableName(testTableNamePartitioned)
.build()));

GetTableResponseBody newerTableResponseBodyMock =
createTableResponseBodyMockWithCreationTime(
testDbName, testTableNameOlder, testPartitionColumnName, testRetentionTTLDays, 0);
Mono<GetTableResponseBody> newerTableResponseMock =
(Mono<GetTableResponseBody>) Mockito.mock(Mono.class);
Mockito.when(newerTableResponseMock.block(any(Duration.class)))
.thenReturn(newerTableResponseBodyMock);
Mockito.when(apiMock.getTableV1(testDbName, testTableNameOlder))
.thenReturn(newerTableResponseMock);
// Retention skipped for a table recently created despite retention config being set.
Assertions.assertFalse(
client.canRunRetention(
TableMetadata.builder().dbName(testDbName).tableName(testTableNameOlder).build()));
}

@Test
Expand Down Expand Up @@ -667,6 +731,15 @@ private GetTableResponseBody createPartitionedTableResponseBodyMock(
return responseBody;
}

private GetTableResponseBody createTableResponseBodyMockWithCreationTime(
String dbName, String tableName, String partitionColummName, int ttlDays, int creationTime) {
GetTableResponseBody responseBody =
createPartitionedTableResponseBodyMock(dbName, tableName, partitionColummName, ttlDays);
Mockito.when(responseBody.getCreationTime())
.thenReturn(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(creationTime));
return responseBody;
}

private GetTableResponseBody createPartitionedTableWithPatternResponseBodyMock(
String dbName,
String tableName,
Expand Down

0 comments on commit 24b3382

Please sign in to comment.