Skip to content

Commit

Permalink
Implement patch API for datasources (#2273) (#2329)
Browse files Browse the repository at this point in the history
* Implement patch API for datasources



* Change patch implementation to Map



* Fix up, everything complete except unit test



* Revise PR to use existing functions



* Remove unused utility function



* Add tests



* Add back line



* fix build issue



* Fix tests and add in rst



* Register patch



* Add imports



* Patch



* Fix integration test



* Update IT



* Add tests



* Fix test



* Fix tests and increase code cov



* Add more coverage to impl



* Fix test and jacoco passing



* Test fix



* Add docs



---------


(cherry picked from commit f835112)

Signed-off-by: Derek Ho <dxho@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 3d1a376 commit 4c151fe
Show file tree
Hide file tree
Showing 15 changed files with 580 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.datasource;

import java.util.Map;
import java.util.Set;
import org.opensearch.sql.datasource.model.DataSource;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
Expand Down Expand Up @@ -56,12 +57,19 @@ public interface DataSourceService {
void createDataSource(DataSourceMetadata metadata);

/**
* Updates {@link DataSource} corresponding to dataSourceMetadata.
* Updates {@link DataSource} corresponding to dataSourceMetadata (all fields needed).
*
* @param dataSourceMetadata {@link DataSourceMetadata}.
*/
void updateDataSource(DataSourceMetadata dataSourceMetadata);

/**
* Patches {@link DataSource} corresponding to the given name (only fields to be changed needed).
*
* @param dataSourceData
*/
void patchDataSource(Map<String, Object> dataSourceData);

/**
* Deletes {@link DataSource} corresponding to the DataSource name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ public DataSource getDataSource(String dataSourceName) {
@Override
public void updateDataSource(DataSourceMetadata dataSourceMetadata) {}

@Override
public void patchDataSource(Map<String, Object> dataSourceData) {}

@Override
public void deleteDataSource(String dataSourceName) {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasources.model.transport;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.CONNECTOR_FIELD;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD;

import java.io.IOException;
import java.util.Map;
import lombok.Getter;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.StreamInput;

public class PatchDataSourceActionRequest extends ActionRequest {

@Getter private Map<String, Object> dataSourceData;

/** Constructor of UpdateDataSourceActionRequest from StreamInput. */
public PatchDataSourceActionRequest(StreamInput in) throws IOException {
super(in);
}

public PatchDataSourceActionRequest(Map<String, Object> dataSourceData) {
this.dataSourceData = dataSourceData;
}

@Override
public ActionRequestValidationException validate() {
if (this.dataSourceData.get(NAME_FIELD).equals(DEFAULT_DATASOURCE_NAME)) {
ActionRequestValidationException exception = new ActionRequestValidationException();
exception.addValidationError(
"Not allowed to update datasource with name : " + DEFAULT_DATASOURCE_NAME);
return exception;
} else if (this.dataSourceData.get(CONNECTOR_FIELD) != null) {
ActionRequestValidationException exception = new ActionRequestValidationException();
exception.addValidationError("Not allowed to update connector for datasource");
return exception;
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasources.model.transport;

import java.io.IOException;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

@RequiredArgsConstructor
public class PatchDataSourceActionResponse extends ActionResponse {

@Getter private final String result;

public PatchDataSourceActionResponse(StreamInput in) throws IOException {
super(in);
result = in.readString();
}

@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
streamOutput.writeString(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@
import static org.opensearch.core.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.core.rest.RestStatus.NOT_FOUND;
import static org.opensearch.core.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.opensearch.rest.RestRequest.Method.DELETE;
import static org.opensearch.rest.RestRequest.Method.GET;
import static org.opensearch.rest.RestRequest.Method.POST;
import static org.opensearch.rest.RestRequest.Method.PUT;
import static org.opensearch.rest.RestRequest.Method.*;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
Expand All @@ -32,18 +30,8 @@
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.datasources.exceptions.ErrorMessage;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.CreateDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.DeleteDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.GetDataSourceActionResponse;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.UpdateDataSourceActionResponse;
import org.opensearch.sql.datasources.transport.TransportCreateDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportDeleteDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportGetDataSourceAction;
import org.opensearch.sql.datasources.transport.TransportUpdateDataSourceAction;
import org.opensearch.sql.datasources.model.transport.*;
import org.opensearch.sql.datasources.transport.*;
import org.opensearch.sql.datasources.utils.Scheduler;
import org.opensearch.sql.datasources.utils.XContentParserUtils;

Expand Down Expand Up @@ -98,6 +86,17 @@ public List<Route> routes() {
*/
new Route(PUT, BASE_DATASOURCE_ACTION_URL),

/*
* PATCH datasources
* Request body:
* Ref
* [org.opensearch.sql.plugin.transport.datasource.model.PatchDataSourceActionRequest]
* Response body:
* Ref
* [org.opensearch.sql.plugin.transport.datasource.model.PatchDataSourceActionResponse]
*/
new Route(PATCH, BASE_DATASOURCE_ACTION_URL),

/*
* DELETE datasources
* Request body: Ref
Expand All @@ -122,6 +121,8 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
return executeUpdateRequest(restRequest, nodeClient);
case DELETE:
return executeDeleteRequest(restRequest, nodeClient);
case PATCH:
return executePatchRequest(restRequest, nodeClient);
default:
return restChannel ->
restChannel.sendResponse(
Expand Down Expand Up @@ -216,6 +217,34 @@ public void onFailure(Exception e) {
}));
}

private RestChannelConsumer executePatchRequest(RestRequest restRequest, NodeClient nodeClient)
throws IOException {
Map<String, Object> dataSourceData = XContentParserUtils.toMap(restRequest.contentParser());
return restChannel ->
Scheduler.schedule(
nodeClient,
() ->
nodeClient.execute(
TransportPatchDataSourceAction.ACTION_TYPE,
new PatchDataSourceActionRequest(dataSourceData),
new ActionListener<>() {
@Override
public void onResponse(
PatchDataSourceActionResponse patchDataSourceActionResponse) {
restChannel.sendResponse(
new BytesRestResponse(
RestStatus.OK,
"application/json; charset=UTF-8",
patchDataSourceActionResponse.getResult()));
}

@Override
public void onFailure(Exception e) {
handleException(e, restChannel);
}
}));
}

private RestChannelConsumer executeDeleteRequest(RestRequest restRequest, NodeClient nodeClient) {

String dataSourceName = restRequest.param("dataSourceName");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@
package org.opensearch.sql.datasources.service;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.*;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSource;
Expand Down Expand Up @@ -100,6 +96,19 @@ public void updateDataSource(DataSourceMetadata dataSourceMetadata) {
}
}

@Override
public void patchDataSource(Map<String, Object> dataSourceData) {
if (!dataSourceData.get(NAME_FIELD).equals(DEFAULT_DATASOURCE_NAME)) {
DataSourceMetadata dataSourceMetadata =
getRawDataSourceMetadata((String) dataSourceData.get(NAME_FIELD));
replaceOldDatasourceMetadata(dataSourceData, dataSourceMetadata);
updateDataSource(dataSourceMetadata);
} else {
throw new UnsupportedOperationException(
"Not allowed to update default datasource :" + DEFAULT_DATASOURCE_NAME);
}
}

@Override
public void deleteDataSource(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
Expand Down Expand Up @@ -136,6 +145,35 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) {
+ " Properties are required parameters.");
}

/**
* Replaces the fields in the map of the given metadata.
*
* @param dataSourceData
* @param metadata {@link DataSourceMetadata}.
*/
private void replaceOldDatasourceMetadata(
Map<String, Object> dataSourceData, DataSourceMetadata metadata) {

for (String key : dataSourceData.keySet()) {
switch (key) {
// Name and connector should not be modified
case DESCRIPTION_FIELD:
metadata.setDescription((String) dataSourceData.get(DESCRIPTION_FIELD));
break;
case ALLOWED_ROLES_FIELD:
metadata.setAllowedRoles((List<String>) dataSourceData.get(ALLOWED_ROLES_FIELD));
break;
case PROPERTIES_FIELD:
Map<String, String> properties = new HashMap<>(metadata.getProperties());
properties.putAll(((Map<String, String>) dataSourceData.get(PROPERTIES_FIELD)));
break;
case NAME_FIELD:
case CONNECTOR_FIELD:
break;
}
}
}

@Override
public DataSourceMetadata getRawDataSourceMetadata(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.datasources.transport;

import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD;
import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionRequest;
import org.opensearch.sql.datasources.model.transport.PatchDataSourceActionResponse;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

public class TransportPatchDataSourceAction
extends HandledTransportAction<PatchDataSourceActionRequest, PatchDataSourceActionResponse> {

public static final String NAME = "cluster:admin/opensearch/ql/datasources/patch";
public static final ActionType<PatchDataSourceActionResponse> ACTION_TYPE =
new ActionType<>(NAME, PatchDataSourceActionResponse::new);

private DataSourceService dataSourceService;

/**
* TransportPatchDataSourceAction action for updating datasource.
*
* @param transportService transportService.
* @param actionFilters actionFilters.
* @param dataSourceService dataSourceService.
*/
@Inject
public TransportPatchDataSourceAction(
TransportService transportService,
ActionFilters actionFilters,
DataSourceServiceImpl dataSourceService) {
super(
TransportPatchDataSourceAction.NAME,
transportService,
actionFilters,
PatchDataSourceActionRequest::new);
this.dataSourceService = dataSourceService;
}

@Override
protected void doExecute(
Task task,
PatchDataSourceActionRequest request,
ActionListener<PatchDataSourceActionResponse> actionListener) {
try {
dataSourceService.patchDataSource(request.getDataSourceData());
String responseContent =
new JsonResponseFormatter<String>(PRETTY) {
@Override
protected Object buildJsonObject(String response) {
return response;
}
}.format("Updated DataSource with name " + request.getDataSourceData().get(NAME_FIELD));
actionListener.onResponse(new PatchDataSourceActionResponse(responseContent));
} catch (Exception e) {
actionListener.onFailure(e);
}
}
}
Loading

0 comments on commit 4c151fe

Please sign in to comment.