-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support recovery for index with external scheduler #717
Support recovery for index with external scheduler #717
Conversation
EXTERNAL_SCHEDULER_INTERVAL_THRESHOLD.readFrom(reader) | ||
def externalSchedulerIntervalThreshold(): String = { | ||
val value = EXTERNAL_SCHEDULER_INTERVAL_THRESHOLD.readFrom(reader) | ||
if (value.trim.isEmpty) FlintOptions.DEFAULT_EXTERNAL_SCHEDULER_INTERVAL else value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is default value case already covered above by createWithDefault
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be a bug - conf can be empty string and createWithDefault will not trigger
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if so, should we fix the bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
override def readFrom(reader: ConfigReader): String = {
readOptionKeyString(reader)
.orElse(readFromConf())
.getOrElse(defaultValue)
}
If the spark configuration value is an empty string, readOptionKeyString
returns Some("")
as expected, while Option.getOrElse
returns an empty string. Change the behavior to override empty string with default value means that the entire empty string use case is disallowed.
...ion/src/main/scala/org/opensearch/flint/spark/scheduler/FlintSparkJobSchedulingService.scala
Outdated
Show resolved
Hide resolved
FlintSparkIndexRefresh | ||
.create(indexName, index.get) | ||
.start(spark, flintSparkConf) | ||
flintIndexMetadataService.updateIndexMetadata(indexName, updatedIndex.metadata()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only for external scheduler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For recover index, we need to persist the schduler_mode field for backward compatibility on both internal and external cases.
flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Louis Chu <clingzhi@amazon.com>
fdbf5e7
to
3487f11
Compare
Signed-off-by: Louis Chu <clingzhi@amazon.com>
3487f11
to
4a104cb
Compare
Signed-off-by: Louis Chu <clingzhi@amazon.com>
Signed-off-by: Louis Chu <clingzhi@amazon.com>
EXTERNAL_SCHEDULER_INTERVAL_THRESHOLD.readFrom(reader) | ||
def externalSchedulerIntervalThreshold(): String = { | ||
val value = EXTERNAL_SCHEDULER_INTERVAL_THRESHOLD.readFrom(reader) | ||
if (value.trim.isEmpty) FlintOptions.DEFAULT_EXTERNAL_SCHEDULER_INTERVAL else value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if so, should we fix the bug?
FlintSparkIndexRefresh | ||
.create(updatedIndex.name(), updatedIndex) | ||
.validate(spark) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
original logic is guarded by tanslog, is it still required? @dai-chen
flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala
Show resolved
Hide resolved
flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala
Outdated
Show resolved
Hide resolved
}.keySet | ||
|
||
if (changedOptions.isEmpty) { | ||
throw new IllegalArgumentException("No options updated") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previous exception message "auto_refresh option must be updated" is more understandable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's no longer the case since this PR supports single Alter
to switch scheduler_mode, while auto_refresh option should not change
throw new IllegalArgumentException( | ||
s"Altering index to ${refreshMode} refresh only allows options: ${allowedOptionNames}") | ||
s"$context only allows changing: $allowedOptions. Invalid options: $invalidOptions") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- L477-L509 is refactor logic?
- Why we change error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the whole validateUpdateAllowed is refactored to support switch scheduler_mode in single alter, while auto_refresh option should not change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a test for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added more ITs around validation logic 03d4c47
flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala
Outdated
Show resolved
Hide resolved
Some("10 minutes"), | ||
None), | ||
( | ||
"throw exception when interval below threshold but mode is external", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not releate to this PR, How does user configure External Scheduler? Is it any doc mention it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only find the external-scheduler definiton, but there is no guidance how does user configure it.
could u create a issue to cover it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The configuration is here https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md#configurations
spark.flint.job.externalScheduler.enabled: default is false. enable external scheduler for flint auto refresh to schedule refresh job outside of spark.
spark.flint.job.externalScheduler.interval: default is 5 minutes. a string of refresh interval for external scheduler to trigger index refresh.
do you mean we need a seperate section to explain this in detail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my questions is, if user set spark.flint.job.externalScheduler.enabled = true, does user need additinal configuration to run externalScheduler?
...-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala
Outdated
Show resolved
Hide resolved
flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala
Outdated
Show resolved
Hide resolved
flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala
Show resolved
Hide resolved
Signed-off-by: Louis Chu <clingzhi@amazon.com>
Signed-off-by: Louis Chu <clingzhi@amazon.com>
143c847
to
03d4c47
Compare
Signed-off-by: Louis Chu <clingzhi@amazon.com>
03d4c47
to
e59a1f8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx!
* Support recovery for index with external scheduler Signed-off-by: Louis Chu <clingzhi@amazon.com> * Improve default option update logic Signed-off-by: Louis Chu <clingzhi@amazon.com> * Resolve comments Signed-off-by: Louis Chu <clingzhi@amazon.com> * Add index metrics Signed-off-by: Louis Chu <clingzhi@amazon.com> * Remove debugging log and refactor updateSchedulerMode Signed-off-by: Louis Chu <clingzhi@amazon.com> * refactor metrics with aop Signed-off-by: Louis Chu <clingzhi@amazon.com> * Add more IT Signed-off-by: Louis Chu <clingzhi@amazon.com> --------- Signed-off-by: Louis Chu <clingzhi@amazon.com> (cherry picked from commit a345373) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
* Support recovery for index with external scheduler Signed-off-by: Louis Chu <clingzhi@amazon.com> * Improve default option update logic Signed-off-by: Louis Chu <clingzhi@amazon.com> * Resolve comments Signed-off-by: Louis Chu <clingzhi@amazon.com> * Add index metrics Signed-off-by: Louis Chu <clingzhi@amazon.com> * Remove debugging log and refactor updateSchedulerMode Signed-off-by: Louis Chu <clingzhi@amazon.com> * refactor metrics with aop Signed-off-by: Louis Chu <clingzhi@amazon.com> * Add more IT Signed-off-by: Louis Chu <clingzhi@amazon.com> --------- Signed-off-by: Louis Chu <clingzhi@amazon.com> (cherry picked from commit a345373) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
* Support recovery for index with external scheduler * Improve default option update logic * Resolve comments * Add index metrics * Remove debugging log and refactor updateSchedulerMode * refactor metrics with aop * Add more IT --------- (cherry picked from commit a345373) Signed-off-by: Louis Chu <clingzhi@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
* Support recovery for index with external scheduler * Improve default option update logic * Resolve comments * Add index metrics * Remove debugging log and refactor updateSchedulerMode * refactor metrics with aop * Add more IT --------- (cherry picked from commit a345373) Signed-off-by: Louis Chu <clingzhi@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
…t#717) * Support recovery for index with external scheduler Signed-off-by: Louis Chu <clingzhi@amazon.com> * Improve default option update logic Signed-off-by: Louis Chu <clingzhi@amazon.com> * Resolve comments Signed-off-by: Louis Chu <clingzhi@amazon.com> * Add index metrics Signed-off-by: Louis Chu <clingzhi@amazon.com> * Remove debugging log and refactor updateSchedulerMode Signed-off-by: Louis Chu <clingzhi@amazon.com> * refactor metrics with aop Signed-off-by: Louis Chu <clingzhi@amazon.com> * Add more IT Signed-off-by: Louis Chu <clingzhi@amazon.com> --------- Signed-off-by: Louis Chu <clingzhi@amazon.com>
Description
Support recovery for index with external scheduler
Support
ALTER
scheduler_mode between internal and external in single statementAdd index level metrics for refresh query, with dimension (clientId, dataSource, indexName)
Support external scheduler with index name including
-
optimize
build.sbt
on dependency package complieIssues Resolved
#762
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.