Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds reprovision API to support updating search pipelines, ingest pipelines index settings #804

Merged
merged 31 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
773fbb8
Initial commit, Adds ReprovisionWorkflowTransportAction, reprovision …
joshpalis Jun 5, 2024
a3bd6d1
Merge branch 'main' into reprovision
joshpalis Jun 10, 2024
4216247
Initial reprovisiontransportaction implementation, Added UpdateIndexS…
joshpalis Jun 12, 2024
0d7ecbd
Merge branch 'main' into reprovision
joshpalis Jun 12, 2024
202072c
Merge branch 'main' into reprovision
joshpalis Jul 23, 2024
8530df0
Implements Update index Step to support updating index settings, modi…
joshpalis Jul 24, 2024
66e8c81
Improves workflow node comparision
joshpalis Jul 25, 2024
928ecb5
Merge branch 'main' into reprovision
joshpalis Jul 25, 2024
da978b2
Adding comments
joshpalis Jul 25, 2024
3aa18c7
Merge branch 'main' into reprovision
joshpalis Jul 29, 2024
bfecdeb
Fixing tests, adding javadocs
joshpalis Jul 29, 2024
a297214
Adding changelog
joshpalis Jul 29, 2024
166bcf8
Updating parse utils, RestCreateWorkflowAction, CreateWorkflowTranspo…
joshpalis Jul 31, 2024
f26a573
Adding update step and get resource step tests
joshpalis Jul 31, 2024
bfb7a10
Adding check for filtered setting list size
joshpalis Jul 31, 2024
cc6ddd1
Addign reprovision workflow transport action tests
joshpalis Aug 1, 2024
839d41c
Adding tests for reprovision sequence creation
joshpalis Aug 1, 2024
578e3f2
Addressing comments
joshpalis Aug 2, 2024
411cd47
Changing GetResourceStep to WorkflowDataStep
joshpalis Aug 2, 2024
3753aec
Addressing PR comments
joshpalis Aug 2, 2024
57c39bf
Fixing state check for reprovision transport action
joshpalis Aug 3, 2024
7016ce1
Adding state eror check to reprovision transport action to remove err…
joshpalis Aug 3, 2024
ce3d016
removing error check from flowframeworkindices handler
joshpalis Aug 3, 2024
ad8ee5b
Adding check for no updated settings
joshpalis Aug 4, 2024
6517669
refactor reprovision sequence creation
joshpalis Aug 4, 2024
112e05f
Fixing workflowrequest serialization
joshpalis Aug 4, 2024
d59c9e1
Addressing PR comments
joshpalis Aug 5, 2024
fed530b
Moving flattenSettings method to ParseUtils, added flatten settings t…
joshpalis Aug 5, 2024
aa2ce8a
updating workflowrequest
joshpalis Aug 5, 2024
8ffa29d
fixing workflowrequest
joshpalis Aug 5, 2024
56ce0ef
spotlessApply
joshpalis Aug 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,11 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x)
### Features
- Support editing of certain workflow fields on a provisioned workflow ([#757](https://github.com/opensearch-project/flow-framework/pull/757))
- Add allow_delete parameter to Deprovision API ([#763](https://github.com/opensearch-project/flow-framework/pull/763))
- Adds reprovision API to support updating search pipelines, ingest pipelines index settings ([#804](https://github.com/opensearch-project/flow-framework/pull/804))

### Enhancements
- Register system index descriptors through SystemIndexPlugin.getSystemIndexDescriptors ([#750](https://github.com/opensearch-project/flow-framework/pull/750))

### Bug Fixes
- Handle Not Found exceptions as successful deletions for agents and models ([#805](https://github.com/opensearch-project/flow-framework/pull/805))
- Wrap CreateIndexRequest mappings in _doc key as required ([#809](https://github.com/opensearch-project/flow-framework/pull/809))
- Have FlowFrameworkException status recognized by ExceptionsHelper ([#811](https://github.com/opensearch-project/flow-framework/pull/811))

### Infrastructure
### Documentation
### Maintenance
### Refactoring
- Improve Template and WorkflowState builders ([#778](https://github.com/opensearch-project/flow-framework/pull/778))
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ dependencies {
implementation "jakarta.json.bind:jakarta.json.bind-api:3.0.1"
implementation "org.glassfish:jakarta.json:2.0.1"
implementation "org.eclipse:yasson:3.0.3"
implementation "com.google.code.gson:gson:2.10.1"

// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.ReprovisionWorkflowAction;
import org.opensearch.flowframework.transport.ReprovisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowAction;
import org.opensearch.flowframework.transport.SearchWorkflowStateAction;
import org.opensearch.flowframework.transport.SearchWorkflowStateTransportAction;
Expand Down Expand Up @@ -170,7 +172,8 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(GetWorkflowStateAction.INSTANCE, GetWorkflowStateTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowStepAction.INSTANCE, GetWorkflowStepTransportAction.class),
new ActionHandler<>(SearchWorkflowStateAction.INSTANCE, SearchWorkflowStateTransportAction.class)
new ActionHandler<>(SearchWorkflowStateAction.INSTANCE, SearchWorkflowStateTransportAction.class),
new ActionHandler<>(ReprovisionWorkflowAction.INSTANCE, ReprovisionWorkflowTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ private CommonValue() {}
public static final String WORKFLOW_STEP = "workflow_step";
/** The param name for default use case, used by the create workflow API */
public static final String USE_CASE = "use_case";
/** The param name for reprovisioning, used by the create workflow API */
public static final String REPROVISION_WORKFLOW = "reprovision";

/*
* Constants associated with plugin configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.opensearch.flowframework.workflow.RegisterRemoteModelStep;
import org.opensearch.flowframework.workflow.ReindexStep;
import org.opensearch.flowframework.workflow.UndeployModelStep;
import org.opensearch.flowframework.workflow.UpdateIndexStep;
import org.opensearch.flowframework.workflow.UpdateIngestPipelineStep;
import org.opensearch.flowframework.workflow.UpdateSearchPipelineStep;

import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -43,29 +46,39 @@
public enum WorkflowResources {

/** Workflow steps for creating/deleting a connector and associated created resource */
CREATE_CONNECTOR(CreateConnectorStep.NAME, WorkflowResources.CONNECTOR_ID, DeleteConnectorStep.NAME),
CREATE_CONNECTOR(CreateConnectorStep.NAME, null, DeleteConnectorStep.NAME, WorkflowResources.CONNECTOR_ID),
/** Workflow steps for registering/deleting a remote model and associated created resource */
REGISTER_REMOTE_MODEL(RegisterRemoteModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_REMOTE_MODEL(RegisterRemoteModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a local model and associated created resource */
REGISTER_LOCAL_MODEL(RegisterLocalCustomModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_LOCAL_MODEL(RegisterLocalCustomModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a local sparse encoding model and associated created resource */
REGISTER_LOCAL_SPARSE_ENCODING_MODEL(RegisterLocalSparseEncodingModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_LOCAL_SPARSE_ENCODING_MODEL(RegisterLocalSparseEncodingModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a local OpenSearch provided pretrained model and associated created resource */
REGISTER_LOCAL_PRETRAINED_MODEL(RegisterLocalPretrainedModelStep.NAME, WorkflowResources.MODEL_ID, DeleteModelStep.NAME),
REGISTER_LOCAL_PRETRAINED_MODEL(RegisterLocalPretrainedModelStep.NAME, null, DeleteModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for registering/deleting a model group and associated created resource */
REGISTER_MODEL_GROUP(RegisterModelGroupStep.NAME, WorkflowResources.MODEL_GROUP_ID, NoOpStep.NAME),
REGISTER_MODEL_GROUP(RegisterModelGroupStep.NAME, null, NoOpStep.NAME, WorkflowResources.MODEL_GROUP_ID),
/** Workflow steps for deploying/undeploying a model and associated created resource */
DEPLOY_MODEL(DeployModelStep.NAME, WorkflowResources.MODEL_ID, UndeployModelStep.NAME),
DEPLOY_MODEL(DeployModelStep.NAME, null, UndeployModelStep.NAME, WorkflowResources.MODEL_ID),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE(CreateIngestPipelineStep.NAME, WorkflowResources.PIPELINE_ID, DeleteIngestPipelineStep.NAME),
CREATE_INGEST_PIPELINE(
CreateIngestPipelineStep.NAME,
UpdateIngestPipelineStep.NAME,
DeleteIngestPipelineStep.NAME,
WorkflowResources.PIPELINE_ID
),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, DeleteSearchPipelineStep.NAME),
CREATE_SEARCH_PIPELINE(
CreateSearchPipelineStep.NAME,
UpdateSearchPipelineStep.NAME,
DeleteSearchPipelineStep.NAME,
WorkflowResources.PIPELINE_ID
),
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, DeleteIndexStep.NAME),
CREATE_INDEX(CreateIndexStep.NAME, UpdateIndexStep.NAME, DeleteIndexStep.NAME, WorkflowResources.INDEX_NAME),
/** Workflow steps for reindex a source index to destination index and associated created resource */
REINDEX(ReindexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
REINDEX(ReindexStep.NAME, null, NoOpStep.NAME, WorkflowResources.INDEX_NAME),
/** Workflow steps for registering/deleting an agent and the associated created resource */
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME);
REGISTER_AGENT(RegisterAgentStep.NAME, null, DeleteAgentStep.NAME, WorkflowResources.AGENT_ID);

/** Connector Id for a remote model connector */
public static final String CONNECTOR_ID = "connector_id";
Expand All @@ -80,34 +93,37 @@
/** Agent Id */
public static final String AGENT_ID = "agent_id";

private final String workflowStep;
private final String resourceCreated;
private final String createStep;
private final String updateStep;
private final String deprovisionStep;
private final String resourceCreated;

private static final Logger logger = LogManager.getLogger(WorkflowResources.class);
private static final Set<String> allResources = Stream.of(values())
.map(WorkflowResources::getResourceCreated)
.collect(Collectors.toSet());

WorkflowResources(String workflowStep, String resourceCreated, String deprovisionStep) {
this.workflowStep = workflowStep;
this.resourceCreated = resourceCreated;
WorkflowResources(String createStep, String updateStep, String deprovisionStep, String resourceCreated) {
this.createStep = createStep;
this.updateStep = updateStep;
this.deprovisionStep = deprovisionStep;
this.resourceCreated = resourceCreated;
}

/**
* Returns the workflowStep for the given enum Constant
* @return the workflowStep of this data.
* Returns the create step for the given enum Constant
* @return the create step of this data.
*/
public String getWorkflowStep() {
return workflowStep;
public String getCreateStep() {
return createStep;
}

/**
* Returns the resourceCreated for the given enum Constant
* @return the resourceCreated of this data.
* Returns the updateStep for the given enum Constant
* @return the updateStep of this data.
*/
public String getResourceCreated() {
return resourceCreated;
public String getUpdateStep() {
return updateStep;
}

/**
Expand All @@ -118,6 +134,14 @@
return deprovisionStep;
}

/**
* Returns the resourceCreated for the given enum Constant
* @return the resourceCreated of this data.
*/
public String getResourceCreated() {
return resourceCreated;
}

/**
* Gets the resources created type based on the workflowStep.
* @param workflowStep workflow step name
Expand All @@ -127,7 +151,9 @@
public static String getResourceByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (workflowStep.equals(mapping.getWorkflowStep()) || workflowStep.equals(mapping.getDeprovisionStep())) {
if (workflowStep.equals(mapping.getCreateStep())
|| workflowStep.equals(mapping.getDeprovisionStep())
|| workflowStep.equals(mapping.getUpdateStep())) {
return mapping.getResourceCreated();
}
}
Expand All @@ -145,7 +171,7 @@
public static String getDeprovisionStepByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
if (mapping.getCreateStep().equals(workflowStep)) {
return mapping.getDeprovisionStep();
}
}
Expand All @@ -154,6 +180,24 @@
throw new FlowFrameworkException("Unable to find deprovision step for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Gets the update step type based on the workflowStep.
* @param workflowStep workflow step name
* @return the corresponding step to update
* @throws FlowFrameworkException if workflow step doesn't exist in enum
*/
public static String getUpdateStepByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getCreateStep().equals(workflowStep)) {
return mapping.getUpdateStep();
}
}
}
logger.error("Unable to find update step for step: {}", workflowStep);
throw new FlowFrameworkException("Unable to find update step for step: " + workflowStep, RestStatus.BAD_REQUEST);

Check warning on line 198 in src/main/java/org/opensearch/flowframework/common/WorkflowResources.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/common/WorkflowResources.java#L197-L198

Added lines #L197 - L198 were not covered by tests
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Returns all the possible resource created types in enum
* @return a set of all the resource created types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ public void updateResourceInStateIndex(
Script script = new Script(
ScriptType.INLINE,
"painless",
"ctx._source.resources_created.add(params.newResource)",
"ctx._source.resources_created.add(params.newResource);",
Collections.singletonMap("newResource", newResource.resourceMap())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.REPROVISION_WORKFLOW;
import static org.opensearch.flowframework.common.CommonValue.UPDATE_WORKFLOW_FIELDS;
import static org.opensearch.flowframework.common.CommonValue.USE_CASE;
import static org.opensearch.flowframework.common.CommonValue.VALIDATION;
Expand Down Expand Up @@ -74,7 +75,7 @@
return List.of(
// Create new workflow
new Route(RestRequest.Method.POST, String.format(Locale.ROOT, "%s", WORKFLOW_URI)),
// Update use case template
// Update use case template/ reprovision existing workflow
new Route(RestRequest.Method.PUT, String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, WORKFLOW_ID))
);
}
Expand All @@ -84,8 +85,10 @@
String workflowId = request.param(WORKFLOW_ID);
String[] validation = request.paramAsStringArray(VALIDATION, new String[] { "all" });
boolean provision = request.paramAsBoolean(PROVISION_WORKFLOW, false);
boolean reprovision = request.paramAsBoolean(REPROVISION_WORKFLOW, false);
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
boolean updateFields = request.paramAsBoolean(UPDATE_WORKFLOW_FIELDS, false);
String useCase = request.param(USE_CASE);

// If provisioning, consume all other params and pass to provision transport action
Map<String, String> params = provision
? request.params()
Expand All @@ -108,28 +111,32 @@
);
}
if (!provision && !params.isEmpty()) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
FlowFrameworkException ffe = new FlowFrameworkException(
"Only the parameters " + request.consumedParams() + " are permitted unless the provision parameter is set to true.",
RestStatus.BAD_REQUEST
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
return processError(ffe, params, request);
}
if (provision && updateFields) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use both the " + PROVISION_WORKFLOW + " and " + UPDATE_WORKFLOW_FIELDS + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
return processError(ffe, params, request);
}
if (reprovision && workflowId == null) {
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
FlowFrameworkException ffe = new FlowFrameworkException(
"You can not use the " + REPROVISION_WORKFLOW + " parameter to create a new template.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);
}
if (reprovision && useCase != null) {
FlowFrameworkException ffe = new FlowFrameworkException(

Check warning on line 135 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L135

Added line #L135 was not covered by tests
"You cannot use the " + REPROVISION_WORKFLOW + " and " + USE_CASE + " parameters in the same request.",
RestStatus.BAD_REQUEST
);
return processError(ffe, params, request);

Check warning on line 139 in src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java#L139

Added line #L139 was not covered by tests
}
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
try {
Template template;
Expand Down Expand Up @@ -213,7 +220,8 @@
provision || updateFields,
params,
useCase,
useCaseDefaultsMap
useCaseDefaultsMap,
reprovision
);

return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
Expand Down Expand Up @@ -249,4 +257,13 @@
);
}
}

private RestChannelConsumer processError(FlowFrameworkException ffe, Map<String, String> params, RestRequest request) {
// Consume params and content so custom exception is processed
params.keySet().stream().forEach(request::param);
request.content();
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}
Loading
Loading