From 346e344363df1b48b70543483702742a41ebc506 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Fri, 14 Jun 2024 13:17:07 -0700 Subject: [PATCH] abstract recordHeartbeat method Signed-off-by: Sean Kao --- .../metadata/log/FlintMetadataLogService.java | 7 +++++++ .../FlintOpenSearchMetadataLogService.java | 9 +++++++++ .../flint/spark/FlintSparkIndexMonitor.scala | 6 +----- .../flint/core/FlintMetadataLogITSuite.scala | 19 +++++++++++++++++++ 4 files changed, 36 insertions(+), 5 deletions(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java index 1c6f42a6a..a356a456f 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogService.java @@ -39,4 +39,11 @@ default OptimisticTransaction startTransaction(String indexName) { * @return optional metadata log */ Optional> getIndexMetadataLog(String indexName); + + /** + * Record heartbeat timestamp for index streaming job. + * + * @param indexName index name + */ + void recordHeartbeat(String indexName); } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java index a49e45829..15b294c8b 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchMetadataLogService.java @@ -17,6 +17,7 @@ import org.opensearch.flint.core.metadata.log.DefaultOptimisticTransaction; import org.opensearch.flint.core.metadata.log.FlintMetadataLog; import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry; +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$; import org.opensearch.flint.core.metadata.log.FlintMetadataLogService; import org.opensearch.flint.core.metadata.log.OptimisticTransaction; @@ -55,6 +56,14 @@ public Optional> getIndexMetadataLog(Str return getIndexMetadataLog(indexName, false); } + @Override + public void recordHeartbeat(String indexName) { + startTransaction(indexName) + .initialLog(latest -> latest.state() == IndexState$.MODULE$.REFRESHING()) + .finalLog(latest -> latest) // timestamp will update automatically + .commit(latest -> null); + } + private Optional> getIndexMetadataLog(String indexName, boolean initIfNotExist) { LOG.info("Getting metadata log for index " + indexName + " and data source " + dataSourceName); try (IRestHighLevelClient client = createOpenSearchClient()) { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 34cae88b5..815dfa71a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -156,11 +156,7 @@ class FlintSparkIndexMonitor( try { if (isStreamingJobActive(indexName)) { logInfo("Streaming job is still active") - flintMetadataLogService - .startTransaction(indexName) - .initialLog(latest => latest.state == REFRESHING) - .finalLog(latest => latest) // timestamp will update automatically - .commit(_ => {}) + flintMetadataLogService.recordHeartbeat(indexName) } else { logError("Streaming job is not active. Cancelling monitor task") stopMonitor(indexName) diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala index f79d3617b..f8a8c2164 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintMetadataLogITSuite.scala @@ -78,4 +78,23 @@ class FlintMetadataLogITSuite extends OpenSearchTransactionSuite with Matchers { val metadataLog = flintMetadataLogService.getIndexMetadataLog(testFlintIndex) metadataLog.isPresent shouldBe false } + + test("should update timestamp when record heartbeat") { + val refreshingLogEntry = flintMetadataLogEntry.copy(state = REFRESHING) + createLatestLogEntry(refreshingLogEntry) + val updateTimeBeforeHeartbeat = + latestLogEntry(testLatestId).get("lastUpdateTime").get.asInstanceOf[Long] + flintMetadataLogService.recordHeartbeat(testFlintIndex) + latestLogEntry(testLatestId) + .get("lastUpdateTime") + .get + .asInstanceOf[Long] should be > updateTimeBeforeHeartbeat + } + + test("should fail when record heartbeat if index not refreshing") { + createLatestLogEntry(flintMetadataLogEntry) + the[IllegalStateException] thrownBy { + flintMetadataLogService.recordHeartbeat(testFlintIndex) + } + } }