Skip to content

Commit

Permalink
Fetch workflow state; align data models with backend (#88) (#89)
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Ohlsen <ohltyler@amazon.com>
(cherry picked from commit 96e2f1a)

Co-authored-by: Tyler Ohlsen <ohltyler@amazon.com>
  • Loading branch information
opensearch-trigger-bot[bot] and ohltyler authored Feb 29, 2024
1 parent e97b5da commit dadbc78
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 28 deletions.
1 change: 1 addition & 0 deletions common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export const PLUGIN_ID = 'flow-framework';
export const FLOW_FRAMEWORK_API_ROUTE_PREFIX = '/_plugins/_flow_framework';
export const FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX = `${FLOW_FRAMEWORK_API_ROUTE_PREFIX}/workflow`;
export const FLOW_FRAMEWORK_SEARCH_WORKFLOWS_ROUTE = `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/_search`;
export const FLOW_FRAMEWORK_SEARCH_WORKFLOW_STATE_ROUTE = `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/state/_search`;

/**
* NODE APIs
Expand Down
10 changes: 6 additions & 4 deletions common/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@ export type UseCaseTemplate = {
export type Workflow = {
id: string;
name: string;
useCase: string;
description?: string;
// ReactFlow state may not exist if a workflow is created via API/backend-only.
workspaceFlowState?: WorkspaceFlowState;
template: UseCaseTemplate;
lastUpdated: number;
lastLaunched: number;
state: WORKFLOW_STATE;
};

Expand All @@ -95,12 +97,12 @@ export type WorkflowLaunch = {
lastUpdated: number;
};

// TODO: finalize list of possible workflow states from backend
// Based off of https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/model/State.java
export enum WORKFLOW_STATE {
SUCCEEDED = 'Succeeded',
FAILED = 'Failed',
IN_PROGRESS = 'In progress',
NOT_STARTED = 'Not started',
PROVISIONING = 'Provisioning',
FAILED = 'Failed',
COMPLETED = 'Completed',
}

export type WorkflowDict = {
Expand Down
2 changes: 1 addition & 1 deletion public/pages/workflow_detail/launches/launch_list.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export function LaunchList(props: LaunchListProps) {
const workflowLaunches = [
{
id: 'Launch_1',
state: WORKFLOW_STATE.IN_PROGRESS,
state: WORKFLOW_STATE.PROVISIONING,
lastUpdated: 12345678,
},
{
Expand Down
19 changes: 12 additions & 7 deletions public/pages/workflows/workflow_list/columns.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@ export const columns = [
),
},
{
field: 'id',
name: 'ID',
field: 'state',
name: 'Status',
sortable: true,
},
{
field: 'description',
name: 'Description',
sortable: false,
field: 'useCase',
name: 'Type',
sortable: true,
},
{
field: 'state',
name: 'Status',
field: 'lastUpdated',
name: 'Last updated',
sortable: true,
},
{
field: 'lastLaunched',
name: 'Last launched',
sortable: true,
},
];
8 changes: 4 additions & 4 deletions public/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,22 +131,22 @@ export function getStateOptions(): EuiFilterSelectItem[] {
return [
// @ts-ignore
{
name: WORKFLOW_STATE.SUCCEEDED,
name: WORKFLOW_STATE.NOT_STARTED,
checked: 'on',
} as EuiFilterSelectItem,
// @ts-ignore
{
name: WORKFLOW_STATE.NOT_STARTED,
name: WORKFLOW_STATE.PROVISIONING,
checked: 'on',
} as EuiFilterSelectItem,
// @ts-ignore
{
name: WORKFLOW_STATE.IN_PROGRESS,
name: WORKFLOW_STATE.FAILED,
checked: 'on',
} as EuiFilterSelectItem,
// @ts-ignore
{
name: WORKFLOW_STATE.FAILED,
name: WORKFLOW_STATE.COMPLETED,
checked: 'on',
} as EuiFilterSelectItem,
];
Expand Down
10 changes: 10 additions & 0 deletions server/cluster/flow_framework_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import {
FLOW_FRAMEWORK_SEARCH_WORKFLOWS_ROUTE,
FLOW_FRAMEWORK_SEARCH_WORKFLOW_STATE_ROUTE,
FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX,
} from '../../common';

Expand Down Expand Up @@ -55,6 +56,15 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) {
method: 'GET',
});

flowFramework.searchWorkflowState = ca({
url: {
fmt: FLOW_FRAMEWORK_SEARCH_WORKFLOW_STATE_ROUTE,
},
needBody: true,
// Exposed client rejects making GET requests with a body. So, we use POST
method: 'POST',
});

flowFramework.createWorkflow = ca({
url: {
fmt: FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX,
Expand Down
23 changes: 15 additions & 8 deletions server/routes/flow_framework_routes_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import {
GET_WORKFLOW_NODE_API_PATH,
GET_WORKFLOW_STATE_NODE_API_PATH,
SEARCH_WORKFLOWS_NODE_API_PATH,
WorkflowDict,
} from '../../common';
import { generateCustomError, toWorkflowObj } from './helpers';
import { generateCustomError, getWorkflowsFromResponses } from './helpers';

/**
* Server-side routes to process flow-framework-related node API calls and execute the
Expand Down Expand Up @@ -112,22 +111,30 @@ export class FlowFrameworkRoutesService {
}
};

// TODO: can remove or simplify if we can fetch all data from a single API call. Tracking issue:
// https://github.com/opensearch-project/flow-framework/issues/171
// Current implementation is making two calls and combining results via helper fn
searchWorkflows = async (
context: RequestHandlerContext,
req: OpenSearchDashboardsRequest,
res: OpenSearchDashboardsResponseFactory
): Promise<IOpenSearchDashboardsResponse<any>> => {
const body = req.body;
try {
const response = await this.client
const workflowsResponse = await this.client
.asScoped(req)
.callAsCurrentUser('flowFramework.searchWorkflows', { body });
const workflowHits = response.hits.hits as any[];
const workflowDict = {} as WorkflowDict;
workflowHits.forEach((workflowHit: any) => {
workflowDict[workflowHit._id] = toWorkflowObj(workflowHit);
});
const workflowHits = workflowsResponse.hits.hits as any[];

const workflowStatesResponse = await this.client
.asScoped(req)
.callAsCurrentUser('flowFramework.searchWorkflowState', { body });
const workflowStateHits = workflowStatesResponse.hits.hits as any[];

const workflowDict = getWorkflowsFromResponses(
workflowHits,
workflowStateHits
);
return res.ok({ body: { workflows: workflowDict } });
} catch (err: any) {
return generateCustomError(res, err);
Expand Down
37 changes: 33 additions & 4 deletions server/routes/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { WORKFLOW_STATE, Workflow } from '../../common';
import { WORKFLOW_STATE, Workflow, WorkflowDict } from '../../common';

// OSD does not provide an interface for this response, but this is following the suggested
// implementations. To prevent typescript complaining, leaving as loosely-typed 'any'
Expand All @@ -19,18 +19,47 @@ export function generateCustomError(res: any, err: any) {
});
}

export function toWorkflowObj(workflowHit: any): Workflow {
function toWorkflowObj(workflowHit: any): Workflow {
// TODO: update schema parsing after hit schema has been updated.
// https://github.com/opensearch-project/flow-framework/issues/546
const hitSource = workflowHit.fields.filter[0];
// const hitSource = workflowHit._source;
return {
id: workflowHit._id,
name: hitSource.name,
useCase: hitSource.use_case,
description: hitSource.description || '',
// TODO: update below values after frontend Workflow interface is finalized
template: {},
// TODO: this needs to be persisted by backend. Tracking issue:
// https://github.com/opensearch-project/flow-framework/issues/548
lastUpdated: 1234,
state: WORKFLOW_STATE.SUCCEEDED,
} as Workflow;
}

// TODO: can remove or simplify if we can fetch all data from a single API call. Tracking issue:
// https://github.com/opensearch-project/flow-framework/issues/171
// Current implementation combines 2 search responses to create a single set of workflows with
// static information + state information
export function getWorkflowsFromResponses(
workflowHits: any[],
workflowStateHits: any[]
): WorkflowDict {
const workflowDict = {} as WorkflowDict;
workflowHits.forEach((workflowHit: any) => {
workflowDict[workflowHit._id] = toWorkflowObj(workflowHit);
const workflowStateHit = workflowStateHits.find(
(workflowStateHit) => workflowStateHit._id === workflowHit._id

Check failure on line 51 in server/routes/helpers.ts

View workflow job for this annotation

GitHub Actions / Run lint script

'workflowStateHit' is already declared in the upper scope
);
const workflowState = workflowStateHit._source
.state as typeof WORKFLOW_STATE;
workflowDict[workflowHit._id] = {
...workflowDict[workflowHit._id],
// @ts-ignore
state: WORKFLOW_STATE[workflowState],
// TODO: this needs to be persisted by backend. Tracking issue:
// https://github.com/opensearch-project/flow-framework/issues/548
lastLaunched: 1234,
};
});
return workflowDict;
}

0 comments on commit dadbc78

Please sign in to comment.