Skip to content

Commit

Permalink
update doc
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo committed Mar 13, 2024
1 parent 05cd665 commit 4cc0d5e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docs/user/interfaces/asyncqueryinterface.rst
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ Async Query Cancellation API
======================================
If security plugin is enabled, this API can only be invoked by users with permission ``cluster:admin/opensearch/ql/jobs/delete``.

Limitation: Flint index creation can not be cancelled. User could submit ALTER statement to stop auto refresh query.
Limitation: Flint index creation statement with auto_refresh = true can not be cancelled. User could submit ALTER statement to stop auto refresh query.
- flint index creation statement include, CREATE SKIPPING INDEX / CREATE INDEX / CREATE MATERIALIZED VIEW

HTTP URI: ``_plugins/_async_query/{queryId}``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ public class RefreshQueryHandler extends BatchQueryHandler {
private final StateStore stateStore;
private final EMRServerlessClient emrServerlessClient;

public RefreshQueryHandler(EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
FlintIndexMetadataReader flintIndexMetadataReader,
StateStore stateStore,
LeaseManager leaseManager) {
public RefreshQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
FlintIndexMetadataReader flintIndexMetadataReader,
StateStore stateStore,
LeaseManager leaseManager) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
this.flintIndexMetadataReader = flintIndexMetadataReader;
this.stateStore = stateStore;
Expand All @@ -57,8 +58,8 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
}

@Override
public DispatchQueryResponse submit(DispatchQueryRequest dispatchQueryRequest,
DispatchQueryContext context) {
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
String indexName =
context.getIndexQueryDetails() == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,17 @@ public DispatchQueryResponse dispatch(DispatchQueryRequest dispatchQueryRequest)
} else if (IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType())
&& indexQueryDetails.isAutoRefresh()) {
asyncQueryHandler =
new StreamingQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager);
new StreamingQueryHandler(
emrServerlessClient, jobExecutionResponseReader, leaseManager);
} else if (IndexQueryActionType.REFRESH.equals(indexQueryDetails.getIndexQueryActionType())) {
// manual refresh should be handled by batch handler
asyncQueryHandler =
new RefreshQueryHandler(emrServerlessClient, jobExecutionResponseReader,
flintIndexMetadataReader, stateStore, leaseManager);
new RefreshQueryHandler(
emrServerlessClient,
jobExecutionResponseReader,
flintIndexMetadataReader,
stateStore,
leaseManager);
}
}
return asyncQueryHandler.submit(dispatchQueryRequest, contextBuilder.build());
Expand Down Expand Up @@ -120,14 +125,18 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
queryHandler = createIndexDMLHandler(emrServerlessClient);
} else if (asyncQueryJobMetadata.getJobType() == JobType.BATCH) {
queryHandler =
new RefreshQueryHandler(emrServerlessClient, jobExecutionResponseReader,
flintIndexMetadataReader, stateStore, leaseManager);
new RefreshQueryHandler(
emrServerlessClient,
jobExecutionResponseReader,
flintIndexMetadataReader,
stateStore,
leaseManager);
} else if (asyncQueryJobMetadata.getJobType() == JobType.STREAMING) {
queryHandler =
new StreamingQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager);
} else {
queryHandler = new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader,
leaseManager);
queryHandler =
new BatchQueryHandler(emrServerlessClient, jobExecutionResponseReader, leaseManager);
}
return queryHandler.cancelJob(asyncQueryJobMetadata);
}
Expand Down

0 comments on commit 4cc0d5e

Please sign in to comment.