Skip to content

Commit

Permalink
Fallback to internal scheduler when index creation failed
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <clingzhi@amazon.com>
  • Loading branch information
noCharger committed Oct 31, 2024
1 parent 950009b commit d7d8934
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
private val flintMetadataCacheWriter = FlintMetadataCacheWriterBuilder.build(flintSparkConf)

private val flintAsyncQueryScheduler: AsyncQueryScheduler = {
AsyncQuerySchedulerBuilder.build(flintSparkConf.flintOptions())
AsyncQuerySchedulerBuilder.build(spark, flintSparkConf.flintOptions())
}

override protected val flintMetadataLogService: FlintMetadataLogService = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.flint.config.FlintSparkConf;
import org.opensearch.flint.common.scheduler.AsyncQueryScheduler;
import org.opensearch.flint.core.FlintOptions;

Expand All @@ -28,11 +30,27 @@ public enum AsyncQuerySchedulerAction {
REMOVE
}

public static AsyncQueryScheduler build(FlintOptions options) {
public static AsyncQueryScheduler build(SparkSession sparkSession, FlintOptions options) {
return new AsyncQuerySchedulerBuilder().doBuild(sparkSession, options);
}

/**
* Builds an AsyncQueryScheduler based on the provided options.
*
* @param sparkSession The SparkSession to be used.
* @param options The FlintOptions containing configuration details.
* @return An instance of AsyncQueryScheduler.
*/
protected AsyncQueryScheduler doBuild(SparkSession sparkSession, FlintOptions options) {
String className = options.getCustomAsyncQuerySchedulerClass();

if (className.isEmpty()) {
return new OpenSearchAsyncQueryScheduler(options);
OpenSearchAsyncQueryScheduler scheduler = createOpenSearchAsyncQueryScheduler(options);
// Check if the scheduler has access to the required index. Disable the external scheduler otherwise.
if (!hasAccessToSchedulerIndex(scheduler)){
setExternalSchedulerEnabled(sparkSession, false);
}
return scheduler;
}

// Attempts to instantiate AsyncQueryScheduler using reflection
Expand All @@ -45,4 +63,16 @@ public static AsyncQueryScheduler build(FlintOptions options) {
throw new RuntimeException("Failed to instantiate AsyncQueryScheduler: " + className, e);
}
}

protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(FlintOptions options) {
return new OpenSearchAsyncQueryScheduler(options);
}

protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) {
return scheduler.hasAccessToSchedulerIndex();
}

protected void setExternalSchedulerEnabled(SparkSession sparkSession, boolean enabled) {
sparkSession.sqlContext().setConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED().key(), String.valueOf(enabled));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -55,6 +56,11 @@ public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler {
private static final ObjectMapper mapper = new ObjectMapper();
private final FlintOptions flintOptions;

@VisibleForTesting
public OpenSearchAsyncQueryScheduler() {
this.flintOptions = new FlintOptions(ImmutableMap.of());
}

public OpenSearchAsyncQueryScheduler(FlintOptions options) {
this.flintOptions = options;
}
Expand Down Expand Up @@ -124,6 +130,26 @@ void createAsyncQuerySchedulerIndex(IRestHighLevelClient client) {
}
}

/**
* Checks if the current setup has access to the scheduler index.
*
* This method attempts to create a client and ensure that the scheduler index exists.
* If these operations succeed, it indicates that the user has the necessary permissions
* to access and potentially modify the scheduler index.
*
* @see #createClient()
* @see #ensureIndexExists(IRestHighLevelClient)
*/
public boolean hasAccessToSchedulerIndex() {
try {
IRestHighLevelClient client = createClient();
ensureIndexExists(client);
return true;
} catch (Throwable e) {
LOG.error("Failed to ensure index exists", e);
return false;
}
}
private void ensureIndexExists(IRestHighLevelClient client) {
try {
if (!client.doesIndexExist(new GetIndexRequest(SCHEDULER_INDEX_NAME), RequestOptions.DEFAULT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,70 @@

package org.opensearch.flint.core.scheduler;

import org.apache.spark.FlintSuite;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SQLContext;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.flint.common.scheduler.AsyncQueryScheduler;
import org.opensearch.flint.common.scheduler.model.AsyncQuerySchedulerRequest;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder;
import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler;

import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class AsyncQuerySchedulerBuilderTest {
@Mock
private SparkSession sparkSession;

@Mock
private SQLContext sqlContext;

private AsyncQuerySchedulerBuilderForLocalTest testBuilder;

@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
when(sparkSession.sqlContext()).thenReturn(sqlContext);
}

@Test
public void testBuildWithEmptyClassNameAndAccessibleIndex() {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("");
OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class);

AsyncQueryScheduler scheduler = testBuilder.build(mockScheduler, true, sparkSession, options);
assertTrue(scheduler instanceof OpenSearchAsyncQueryScheduler);
verify(sqlContext, never()).setConf(anyString(), anyString());
}

@Test
public void testBuildWithEmptyClassName() {
public void testBuildWithEmptyClassNameAndInaccessibleIndex() {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("");
OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class);

AsyncQueryScheduler scheduler = AsyncQuerySchedulerBuilder.build(options);
AsyncQueryScheduler scheduler = testBuilder.build(mockScheduler, false, sparkSession, options);
assertTrue(scheduler instanceof OpenSearchAsyncQueryScheduler);
verify(sqlContext).setConf("spark.flint.job.externalScheduler.enabled", "false");
}

@Test
public void testBuildWithCustomClassName() {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest");
when(options.getCustomAsyncQuerySchedulerClass())
.thenReturn("org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest");

AsyncQueryScheduler scheduler = AsyncQuerySchedulerBuilder.build(options);
AsyncQueryScheduler scheduler = AsyncQuerySchedulerBuilder.build(sparkSession, options);
assertTrue(scheduler instanceof AsyncQuerySchedulerForLocalTest);
}

Expand All @@ -41,7 +77,7 @@ public void testBuildWithInvalidClassName() {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("invalid.ClassName");

AsyncQuerySchedulerBuilder.build(options);
AsyncQuerySchedulerBuilder.build(sparkSession, options);
}

public static class AsyncQuerySchedulerForLocalTest implements AsyncQueryScheduler {
Expand All @@ -65,4 +101,35 @@ public void removeJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
// Custom implementation
}
}

public static class OpenSearchAsyncQuerySchedulerForLocalTest extends OpenSearchAsyncQueryScheduler {
@Override
public boolean hasAccessToSchedulerIndex() {
return true;
}
}

public static class AsyncQuerySchedulerBuilderForLocalTest extends AsyncQuerySchedulerBuilder {
private OpenSearchAsyncQueryScheduler mockScheduler;
private Boolean mockHasAccess;

public AsyncQuerySchedulerBuilderForLocalTest(OpenSearchAsyncQueryScheduler mockScheduler, Boolean mockHasAccess) {
this.mockScheduler = mockScheduler;
this.mockHasAccess = mockHasAccess;
}

@Override
protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(FlintOptions options) {
return mockScheduler != null ? mockScheduler : super.createOpenSearchAsyncQueryScheduler(options);
}

@Override
protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) {
return mockHasAccess != null ? mockHasAccess : super.hasAccessToSchedulerIndex(scheduler);
}

public static AsyncQueryScheduler build(OpenSearchAsyncQueryScheduler asyncQueryScheduler, Boolean hasAccess, SparkSession sparkSession, FlintOptions options) {
return new AsyncQuerySchedulerBuilderForLocalTest(asyncQueryScheduler, hasAccess).doBuild(sparkSession, options);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.opensearch.flint.spark.FlintSparkExtensions

import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.flint.config.FlintConfigEntry
import org.apache.spark.sql.flint.config.{FlintConfigEntry, FlintSparkConf}
import org.apache.spark.sql.flint.config.FlintSparkConf.{EXTERNAL_SCHEDULER_ENABLED, HYBRID_SCAN_ENABLED, METADATA_CACHE_WRITE}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand All @@ -26,6 +26,10 @@ trait FlintSuite extends SharedSparkSession {
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
.set("spark.sql.extensions", classOf[FlintSparkExtensions].getName)
// Override scheduler class for unit testing
.set(
FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key,
"org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest")
conf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.{FlintSuite, SparkConf}
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY}
import org.apache.spark.sql.streaming.StreamTest

Expand All @@ -49,6 +50,8 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit

override def beforeAll(): Unit = {
super.beforeAll()
// Revoke override in FlintSuite on IT
conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key)

// Replace executor to avoid impact on IT.
// TODO: Currently no IT test scheduler so no need to restore it back.
Expand Down

0 comments on commit d7d8934

Please sign in to comment.