Skip to content

Commit

Permalink
abstract recordHeartbeat method
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <seankao@amazon.com>
  • Loading branch information
seankao-az committed Jun 14, 2024
1 parent 9664595 commit 346e344
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,11 @@ default <T> OptimisticTransaction<T> startTransaction(String indexName) {
* @return optional metadata log
*/
Optional<FlintMetadataLog<FlintMetadataLogEntry>> getIndexMetadataLog(String indexName);

/**
* Record heartbeat timestamp for index streaming job.
*
* @param indexName index name
*/
void recordHeartbeat(String indexName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction;
import org.opensearch.flint.core.metadata.log.FlintMetadataLog;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$;
import org.opensearch.flint.core.metadata.log.FlintMetadataLogService;
import org.opensearch.flint.core.metadata.log.OptimisticTransaction;

Expand Down Expand Up @@ -55,6 +56,14 @@ public Optional<FlintMetadataLog<FlintMetadataLogEntry>> getIndexMetadataLog(Str
return getIndexMetadataLog(indexName, false);
}

@Override
public void recordHeartbeat(String indexName) {
startTransaction(indexName)
.initialLog(latest -> latest.state() == IndexState$.MODULE$.REFRESHING())
.finalLog(latest -> latest) // timestamp will update automatically
.commit(latest -> null);
}

private Optional<FlintMetadataLog<FlintMetadataLogEntry>> getIndexMetadataLog(String indexName, boolean initIfNotExist) {
LOG.info("Getting metadata log for index " + indexName + " and data source " + dataSourceName);
try (IRestHighLevelClient client = createOpenSearchClient()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,7 @@ class FlintSparkIndexMonitor(
try {
if (isStreamingJobActive(indexName)) {
logInfo("Streaming job is still active")
flintMetadataLogService
.startTransaction(indexName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest) // timestamp will update automatically
.commit(_ => {})
flintMetadataLogService.recordHeartbeat(indexName)
} else {
logError("Streaming job is not active. Cancelling monitor task")
stopMonitor(indexName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,23 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers {
val metadataLog = flintMetadataLogService.getIndexMetadataLog(testFlintIndex)
metadataLog.isPresent shouldBe false
}

test("should update timestamp when record heartbeat") {
val refreshingLogEntry = flintMetadataLogEntry.copy(state = REFRESHING)
createLatestLogEntry(refreshingLogEntry)
val updateTimeBeforeHeartbeat =
latestLogEntry(testLatestId).get("lastUpdateTime").get.asInstanceOf[Long]
flintMetadataLogService.recordHeartbeat(testFlintIndex)
latestLogEntry(testLatestId)
.get("lastUpdateTime")
.get
.asInstanceOf[Long] should be > updateTimeBeforeHeartbeat
}

test("should fail when record heartbeat if index not refreshing") {
createLatestLogEntry(flintMetadataLogEntry)
the[IllegalStateException] thrownBy {
flintMetadataLogService.recordHeartbeat(testFlintIndex)
}
}
}

0 comments on commit 346e344

Please sign in to comment.