From edb86e345ee48793621ec0efbb57ac48887630ef Mon Sep 17 00:00:00 2001
From: Malini Mahalakshmi Venkatachari <maluchari@gmail.com>
Date: Wed, 28 Aug 2024 10:34:22 -0700
Subject: [PATCH] Add time filtering for stats collection. (#178)

## Summary

Extending PR #163 to TableStatsCollector. It is okay to skip stats
collection for recently created tables for a span of 3 days.

## Changes

- [ ] Client-facing API Changes
- [ X] Internal API Changes
- [ ] Bug Fixes
- [ ] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [ ] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [ ] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [X] Added new tests for the changes made.
- [ ] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.
---
 .../openhouse/jobs/client/TablesClient.java   | 12 ++++++++
 .../tasks/TableStatsCollectionTask.java       |  2 +-
 .../jobs/clients/TablesClientTest.java        | 30 +++++++++++++++++++
 3 files changed, 43 insertions(+), 1 deletion(-)

diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java
index 4777ecd8..2e6b63b4 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java
@@ -140,6 +140,17 @@ public boolean canRunRetention(TableMetadata tableMetadata) {
     return config.isPresent();
   }
 
+  /**
+   * Checks if stats collection can be executed on the input table.
+   *
+   * @param tableMetadata table metadata
+   * @return true if the stats collection can happen, false otherwise
+   */
+  public boolean canRunTableStatsCollection(TableMetadata tableMetadata) {
+    GetTableResponseBody response = getTable(tableMetadata);
+    return response != null && checkCreationTimeEligibility(response);
+  }
+
   /**
    * Checks if staged deletion task can be run on given table
    *
@@ -291,6 +302,7 @@ protected TableMetadata mapTableResponseToTableMetadata(GetTableResponseBody res
             .dbName(responseBody.getDatabaseId())
             .tableName(responseBody.getTableId())
             .build();
+
     String creator = getTable(metadata).getTableCreator();
     return TableMetadata.builder()
         .creator(creator)
diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableStatsCollectionTask.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableStatsCollectionTask.java
index 94a9b221..2c18c89d 100644
--- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableStatsCollectionTask.java
+++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/TableStatsCollectionTask.java
@@ -34,6 +34,6 @@ protected List<String> getArgs() {
 
   @Override
   protected boolean shouldRun() {
-    return true;
+    return tablesClient.canRunTableStatsCollection(getMetadata());
   }
 }
diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/clients/TablesClientTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/clients/TablesClientTest.java
index e6d8c753..42b096e8 100644
--- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/clients/TablesClientTest.java
+++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/clients/TablesClientTest.java
@@ -463,6 +463,36 @@ void testCanRunDataCompaction() {
             TableMetadata.builder().dbName(testDbName).tableName(testReplicaTableName).build()));
   }
 
+  @Test
+  void testCanRunTableStatsCollection() {
+    GetTableResponseBody olderTableResponseBodyMock =
+        createTableResponseBodyMockWithCreationTime(
+            testDbName, testTableNameOlder, testPartitionColumnName, testRetentionTTLDays, 2);
+    Mono<GetTableResponseBody> olderTableResponseMock =
+        (Mono<GetTableResponseBody>) Mockito.mock(Mono.class);
+    Mockito.when(olderTableResponseMock.block(any(Duration.class)))
+        .thenReturn(olderTableResponseBodyMock);
+    Mockito.when(apiMock.getTableV1(testDbName, testTableNameOlder))
+        .thenReturn(olderTableResponseMock);
+
+    GetTableResponseBody newerTableResponseBodyMock =
+        createTableResponseBodyMockWithCreationTime(
+            testDbName, testTableNameNewer, testPartitionColumnName, testRetentionTTLDays, 0);
+    Mono<GetTableResponseBody> newerTableResponseMock =
+        (Mono<GetTableResponseBody>) Mockito.mock(Mono.class);
+    Mockito.when(newerTableResponseMock.block(any(Duration.class)))
+        .thenReturn(newerTableResponseBodyMock);
+    Mockito.when(apiMock.getTableV1(testDbName, testTableNameNewer))
+        .thenReturn(newerTableResponseMock);
+
+    Assertions.assertTrue(
+        client.canRunTableStatsCollection(
+            TableMetadata.builder().dbName(testDbName).tableName(testTableNameOlder).build()));
+    Assertions.assertFalse(
+        client.canRunTableStatsCollection(
+            TableMetadata.builder().dbName(testDbName).tableName(testTableNameNewer).build()));
+  }
+
   @Test
   void testPartitionedTableNullPoliciesGetRetentionConfig() {
     GetTableResponseBody partitionedTableResponseBodyMock =