Skip to content

Commit

Permalink
[Feature] Flint query scheduler part1 - integrate job scheduler plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <clingzhi@amazon.com>
  • Loading branch information
noCharger committed Jul 17, 2024
1 parent 0c2e1da commit 91bd2bc
Show file tree
Hide file tree
Showing 20 changed files with 845 additions and 35 deletions.
1 change: 1 addition & 0 deletions async-query-core/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ NANOSECOND: 'NANOSECOND';
NANOSECONDS: 'NANOSECONDS';
NATURAL: 'NATURAL';
NO: 'NO';
NONE: 'NONE';
NOT: 'NOT';
NULL: 'NULL';
NULLS: 'NULLS';
Expand Down
20 changes: 18 additions & 2 deletions async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ singleCompoundStatement
;

beginEndCompoundBlock
: BEGIN compoundBody END
: beginLabel? BEGIN compoundBody END endLabel?
;

compoundBody
Expand All @@ -68,6 +68,14 @@ singleStatement
: statement SEMICOLON* EOF
;

beginLabel
: multipartIdentifier COLON
;

endLabel
: multipartIdentifier
;

singleExpression
: namedExpression EOF
;
Expand Down Expand Up @@ -174,6 +182,8 @@ statement
| ALTER TABLE identifierReference
(partitionSpec)? SET locationSpec #setTableLocation
| ALTER TABLE identifierReference RECOVER PARTITIONS #recoverPartitions
| ALTER TABLE identifierReference
(clusterBySpec | CLUSTER BY NONE) #alterClusterBy
| DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable
| DROP VIEW (IF EXISTS)? identifierReference #dropView
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
Expand Down Expand Up @@ -853,13 +863,17 @@ identifierComment

relationPrimary
: identifierReference temporalClause?
sample? tableAlias #tableName
optionsClause? sample? tableAlias #tableName
| LEFT_PAREN query RIGHT_PAREN sample? tableAlias #aliasedQuery
| LEFT_PAREN relation RIGHT_PAREN sample? tableAlias #aliasedRelation
| inlineTable #inlineTableDefault2
| functionTable #tableValuedFunction
;

optionsClause
: WITH options=propertyList
;

inlineTable
: VALUES expression (COMMA expression)* tableAlias
;
Expand Down Expand Up @@ -1572,6 +1586,7 @@ ansiNonReserved
| NANOSECOND
| NANOSECONDS
| NO
| NONE
| NULLS
| NUMERIC
| OF
Expand Down Expand Up @@ -1920,6 +1935,7 @@ nonReserved
| NANOSECOND
| NANOSECONDS
| NO
| NONE
| NOT
| NULL
| NULLS
Expand Down
2 changes: 2 additions & 0 deletions async-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ repositories {


dependencies {
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"

api project(':core')
api project(':async-query-core')
implementation project(':protocol')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob;
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;
import org.opensearch.threadpool.ThreadPool;

public class OpenSearchAsyncQueryScheduler {
public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler";
public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler";
private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME =
"async-query-scheduler-index-mapping.yml";
private static final String SCHEDULER_INDEX_SETTINGS_FILE_NAME =
"async-query-scheduler-index-settings.yml";
private static final Logger LOG = LogManager.getLogger();

Check warning on line 46 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L46

Added line #L46 was not covered by tests
private Client client;
private ClusterService clusterService;

public OpenSearchAsyncQueryScheduler() {
LOG.info("OpenSearchAsyncQueryScheduler initialized");
}

Check warning on line 52 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L50-L52

Added lines #L50 - L52 were not covered by tests

public void loadJobResource(Client client, ClusterService clusterService, ThreadPool threadPool) {
this.client = client;
this.clusterService = clusterService;

Check warning on line 56 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L55-L56

Added lines #L55 - L56 were not covered by tests
OpenSearchRefreshIndexJob openSearchRefreshIndexJob =
OpenSearchRefreshIndexJob.getJobRunnerInstance();
openSearchRefreshIndexJob.setClusterService(clusterService);
openSearchRefreshIndexJob.setThreadPool(threadPool);
openSearchRefreshIndexJob.setClient(client);
}

Check warning on line 62 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L58-L62

Added lines #L58 - L62 were not covered by tests

public static ScheduledJobRunner getJobRunner() {
return OpenSearchRefreshIndexJob.getJobRunnerInstance();

Check warning on line 65 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L65

Added line #L65 was not covered by tests
}

public void scheduleJob(OpenSearchRefreshIndexJobRequest request) throws IOException {
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
createAsyncQuerySchedulerIndex();

Check warning on line 70 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L70

Added line #L70 was not covered by tests
}
IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME);
indexRequest.id(request.getName());
indexRequest.opType(DocWriteRequest.OpType.CREATE);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

Check warning on line 75 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L72-L75

Added lines #L72 - L75 were not covered by tests
ActionFuture<IndexResponse> indexResponseActionFuture;
IndexResponse indexResponse;

indexRequest.source(request.toXContent(JsonXContent.contentBuilder(), null));
indexResponseActionFuture = client.index(indexRequest);
indexResponse = indexResponseActionFuture.actionGet();

Check warning on line 81 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L79-L81

Added lines #L79 - L81 were not covered by tests

if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
LOG.info("Job : {} successfully created", request.getName());

Check warning on line 84 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L84

Added line #L84 was not covered by tests
} else {
throw new RuntimeException(
"Schedule job failed with result : " + indexResponse.getResult().getLowercase());

Check warning on line 87 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L86-L87

Added lines #L86 - L87 were not covered by tests
}
LOG.info(indexRequest.source());
}

Check warning on line 90 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L89-L90

Added lines #L89 - L90 were not covered by tests

public void unscheduleJob(OpenSearchRefreshIndexJobRequest request) throws IOException {
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
throw new RuntimeException("Index does not exist.");

Check warning on line 94 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L94

Added line #L94 was not covered by tests
}
IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME).id(request.getName());
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
builder.field("enabled", false);
builder.endObject();
indexRequest.source(builder);

Check warning on line 102 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L96-L102

Added lines #L96 - L102 were not covered by tests
}

ActionFuture<IndexResponse> indexResponseActionFuture = client.index(indexRequest);
IndexResponse indexResponse = indexResponseActionFuture.actionGet();

Check warning on line 106 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L105-L106

Added lines #L105 - L106 were not covered by tests

if (indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) {
LOG.info("Job : {} successfully unscheduled", request.getName());

Check warning on line 109 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L109

Added line #L109 was not covered by tests
} else {
throw new RuntimeException(
"Unschedule job failed with result : " + indexResponse.getResult().getLowercase());

Check warning on line 112 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L111-L112

Added lines #L111 - L112 were not covered by tests
}
}

Check warning on line 114 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L114

Added line #L114 was not covered by tests

public void removeJob(OpenSearchRefreshIndexJobRequest request) {
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
throw new RuntimeException("Index does not exist.");

Check warning on line 118 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L118

Added line #L118 was not covered by tests
}
DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, request.getName());
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<DeleteResponse> deleteResponseActionFuture = client.delete(deleteRequest);
DeleteResponse deleteResponse = deleteResponseActionFuture.actionGet();

Check warning on line 123 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L120-L123

Added lines #L120 - L123 were not covered by tests

if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED)) {
LOG.info("Job : {} successfully deleted", request.getName());

Check warning on line 126 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L126

Added line #L126 was not covered by tests
} else if (deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
throw new RuntimeException("Job : " + request.getName() + " doesn't exist");

Check warning on line 128 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L128

Added line #L128 was not covered by tests
} else {
throw new RuntimeException(
"Remove job failed with result : " + deleteResponse.getResult().getLowercase());

Check warning on line 131 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L130-L131

Added lines #L130 - L131 were not covered by tests
}
}

Check warning on line 133 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L133

Added line #L133 was not covered by tests

public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOException {
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
throw new RuntimeException("Index does not exist.");

Check warning on line 137 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L137

Added line #L137 was not covered by tests
}
IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME).id(request.getName());
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();

Check warning on line 142 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L139-L142

Added lines #L139 - L142 were not covered by tests
if (request.getSchedule() != null) {
builder.field("schedule", request.getSchedule());

Check warning on line 144 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L144

Added line #L144 was not covered by tests
}
builder.endObject();
indexRequest.source(builder);

Check warning on line 147 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L146-L147

Added lines #L146 - L147 were not covered by tests
}

ActionFuture<IndexResponse> indexResponseActionFuture = client.index(indexRequest);
IndexResponse indexResponse = indexResponseActionFuture.actionGet();

Check warning on line 151 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L150-L151

Added lines #L150 - L151 were not covered by tests

if (indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) {
LOG.info("Job : {} successfully updated", request.getName());

Check warning on line 154 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L154

Added line #L154 was not covered by tests
} else {
throw new RuntimeException(
"Update job failed with result : " + indexResponse.getResult().getLowercase());

Check warning on line 157 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L156-L157

Added lines #L156 - L157 were not covered by tests
}
}

Check warning on line 159 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L159

Added line #L159 was not covered by tests

private void createAsyncQuerySchedulerIndex() {
try {
InputStream mappingFileStream =

Check warning on line 163 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L163

Added line #L163 was not covered by tests
OpenSearchAsyncQueryScheduler.class
.getClassLoader()
.getResourceAsStream(SCHEDULER_INDEX_MAPPING_FILE_NAME);
InputStream settingsFileStream =

Check warning on line 167 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L165-L167

Added lines #L165 - L167 were not covered by tests
OpenSearchAsyncQueryScheduler.class
.getClassLoader()
.getResourceAsStream(SCHEDULER_INDEX_SETTINGS_FILE_NAME);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(SCHEDULER_INDEX_NAME);
createIndexRequest.mapping(
IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML);
createIndexRequest.settings(
IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), XContentType.YAML);
ActionFuture<CreateIndexResponse> createIndexResponseActionFuture =
client.admin().indices().create(createIndexRequest);
CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet();

Check warning on line 178 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L169-L178

Added lines #L169 - L178 were not covered by tests

if (createIndexResponse.isAcknowledged()) {
LOG.info("Index: {} creation Acknowledged", SCHEDULER_INDEX_NAME);

Check warning on line 181 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L181

Added line #L181 was not covered by tests
} else {
throw new RuntimeException("Index creation is not acknowledged.");

Check warning on line 183 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L183

Added line #L183 was not covered by tests
}
} catch (Throwable e) {
LOG.error("Error creating index: {}", SCHEDULER_INDEX_NAME, e);
throw new RuntimeException(

Check warning on line 187 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L185-L187

Added lines #L185 - L187 were not covered by tests
"Internal server error while creating "
+ SCHEDULER_INDEX_NAME
+ " index: "
+ e.getMessage(),

Check warning on line 191 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L191

Added line #L191 was not covered by tests
e);
}
}

Check warning on line 194 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L193-L194

Added lines #L193 - L194 were not covered by tests

public static ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
OpenSearchRefreshIndexJobRequest.Builder builder =

Check warning on line 198 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L197-L198

Added lines #L197 - L198 were not covered by tests
new OpenSearchRefreshIndexJobRequest.Builder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

Check warning on line 201 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L200-L201

Added lines #L200 - L201 were not covered by tests

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();

Check warning on line 205 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L204-L205

Added lines #L204 - L205 were not covered by tests
switch (fieldName) {
case OpenSearchRefreshIndexJobRequest.JOB_NAME_FIELD:
builder.withJobName(parser.text());
break;

Check warning on line 209 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L208-L209

Added lines #L208 - L209 were not covered by tests
case OpenSearchRefreshIndexJobRequest.ENABLED_FIELD:
builder.withEnabled(parser.booleanValue());
break;

Check warning on line 212 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L211-L212

Added lines #L211 - L212 were not covered by tests
case OpenSearchRefreshIndexJobRequest.ENABLED_TIME_FIELD:
builder.withEnabledTime(parseInstantValue(parser));
break;

Check warning on line 215 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L214-L215

Added lines #L214 - L215 were not covered by tests
case OpenSearchRefreshIndexJobRequest.LAST_UPDATE_TIME_FIELD:
builder.withLastUpdateTime(parseInstantValue(parser));
break;

Check warning on line 218 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L217-L218

Added lines #L217 - L218 were not covered by tests
case OpenSearchRefreshIndexJobRequest.SCHEDULE_FIELD:
builder.withSchedule(ScheduleParser.parse(parser));
break;

Check warning on line 221 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L220-L221

Added lines #L220 - L221 were not covered by tests
case OpenSearchRefreshIndexJobRequest.LOCK_DURATION_SECONDS:
builder.withLockDurationSeconds(parser.longValue());
break;

Check warning on line 224 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L223-L224

Added lines #L223 - L224 were not covered by tests
case OpenSearchRefreshIndexJobRequest.JITTER:
builder.withJitter(parser.doubleValue());
break;

Check warning on line 227 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L226-L227

Added lines #L226 - L227 were not covered by tests
default:
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());

Check warning on line 229 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L229

Added line #L229 was not covered by tests
}
}
return builder.build();

Check warning on line 232 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L231-L232

Added lines #L231 - L232 were not covered by tests
};
}

private static Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;

Check warning on line 238 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L238

Added line #L238 was not covered by tests
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());

Check warning on line 241 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L241

Added line #L241 was not covered by tests
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;

Check warning on line 244 in async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java

View check run for this annotation

Codecov / codecov/patch

async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java#L243-L244

Added lines #L243 - L244 were not covered by tests
}
}
Loading

0 comments on commit 91bd2bc

Please sign in to comment.