Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Flint index refresh job continues despite critical dependency unavailable #344

Closed
dai-chen opened this issue May 16, 2024 · 0 comments · Fixed by #346
Closed

[BUG] Flint index refresh job continues despite critical dependency unavailable #344

dai-chen opened this issue May 16, 2024 · 0 comments · Fixed by #346
Assignees
Labels
0.5 enhancement New feature or request

Comments

@dai-chen
Copy link
Collaborator

dai-chen commented May 16, 2024

What is the bug?

The Spark streaming job continues running even if a) OpenSearch cluster or b) metadata log index becomes unavailable. This leads to a situation where data integrity and system stability might be compromised without immediate feedback or error handling in the monitoring system.

How can one reproduce the bug?

Steps to reproduce the behavior:

  1. Set up a Spark streaming job by creating a Flint index.
  2. Simulate a failure such as shut down the OpenSearch cluster, or delete metadata log index.
  3. Observe that the Spark streaming job continues to run without failing or raising significant errors.
  4. Check monitoring or logging outputs and notice the lack of appropriate error handling or job termination.

What is the expected behavior?

The expected behavior is for the Spark streaming job to either halt or switch to a safe mode when it can no longer communicate with the OpenSearch cluster or access the metadata logs. This should trigger alerts or error messages that inform the system administrators or halt further data processing to prevent data corruption or data loss.

Possbile solutions:

  1. External system intervention: Implement a mechanism where an external monitoring system observes heartbeat metrics and terminates the streaming job if it detects error or missing heartbeat metrics.
  2. Internal job termination: Enhance the index monitor so that it can terminate the streaming job after a predefined number of consecutive failures to update the heartbeat. This would allow the job to self-manage its failure states and gracefully shut down if it consistently fails to communicate its status.

Do you have any additional context?

Code: https://github.com/opensearch-project/opensearch-spark/blob/main/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala#L69

Sample log:

24/05/15 12:32:27 INFO FlintSparkIndexMonitor: Scheduler trigger index monitor task for flint_mys3_default_prabarch_p3_index
24/05/15 12:32:27 INFO FlintSparkIndexMonitor: Streaming job is still active
24/05/15 12:32:27 INFO FlintOpenSearchClient: Starting transaction on index flint_mys3_default_prabarch_p3_index and data source mys3
24/05/15 12:32:27 INFO RetryableHttpAsyncClient: Building retryable http async client with options: FlintRetryOptions{maxRetries=3, retryableStatusCodes=429,502, retryableExceptionClassNames=Optional.empty}
24/05/15 12:32:27 INFO HttpStatusCodeResultPredicate: Checking if status code is retryable: 404
24/05/15 12:32:27 INFO HttpStatusCodeResultPredicate: Status code 404 check result: false
24/05/15 12:32:27 WARN FlintOpenSearchClient: Metadata log index not found .query_execution_request_mys3
24/05/15 12:32:27 ERROR FlintSparkIndexMonitor: Failed to update index log entry
java.lang.IllegalStateException: Metadata log index not found .query_execution_request_mys3
	at org.opensearch.flint.core.storage.FlintOpenSearchClient.startTransaction(FlintOpenSearchClient.java:114) ~[opensearch-spark-standalone_2.12-latest.jar:0.4.0-SNAPSHOT]
	at org.opensearch.flint.core.storage.FlintOpenSearchClient.startTransaction(FlintOpenSearchClient.java:126) ~[opensearch-spark-standalone_2.12-latest.jar:0.4.0-SNAPSHOT]
	at org.opensearch.flint.spark.FlintSparkIndexMonitor.$anonfun$startMonitor$1(FlintSparkIndexMonitor.scala:51) ~[opensearch-spark-standalone_2.12-latest.jar:0.4.0-SNAPSHOT]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
	at java.lang.Thread.run(Thread.java:840) ~[?:?]
@dai-chen dai-chen added bug Something isn't working untriaged 0.5 0.4 and removed untriaged 0.5 labels May 16, 2024
@dai-chen dai-chen added enhancement New feature or request 0.5 and removed bug Something isn't working 0.4 labels May 17, 2024
@dai-chen dai-chen self-assigned this May 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0.5 enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant