Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(v2): standardize MLMD data model. Fixes #5669 #6054

Merged
merged 17 commits into from
Jul 20, 2021
5 changes: 3 additions & 2 deletions frontend/src/mlmd/LineageActionBar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import ReplayIcon from '@material-ui/icons/Replay';
import { classes, stylesheet } from 'typestyle';
import { color, commonCss, fonts, padding } from './Css';
import { CSSProperties } from 'typestyle/lib/types';
import { getArtifactName, getResourcePropertyViaFallBack } from './Utils';
import { getResourcePropertyViaFallBack } from './Utils';
import { Artifact } from 'src/third_party/mlmd';
import { ArtifactProperties, ArtifactCustomProperties } from './Api';
import { ArtifactHelpers } from './MlmdUtils';

const baseLinkButton: CSSProperties = {
backgroundColor: 'transparent',
Expand Down Expand Up @@ -159,7 +160,7 @@ export class LineageActionBar extends React.Component<
disabled={isActive}
onClick={onBreadcrumbClicked}
>
{getArtifactName(artifact)}
{ArtifactHelpers.getName(artifact)}
</button>,
);
if (!isActive) {
Expand Down
5 changes: 3 additions & 2 deletions frontend/src/mlmd/LineageView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ import {
} from 'src/third_party/mlmd';
import { RefObject } from 'react';
import { getArtifactTypes, getExecutionTypes } from './LineageApi';
import { getTypeName, getArtifactName } from './Utils';
import { getTypeName } from './Utils';
import { Api } from './Api';
import { LineageResource } from './LineageTypes';
import CircularProgress from '@material-ui/core/CircularProgress';
import { ArtifactHelpers } from './MlmdUtils';

const isInputEvent = (event: Event) =>
[Event.Type.INPUT.valueOf(), Event.Type.DECLARED_INPUT.valueOf()].includes(event.getType());
Expand Down Expand Up @@ -392,7 +393,7 @@ export class LineageView extends React.Component<LineageViewProps, LineageViewSt
},
error => {
console.error(
`Failed to load related data for artifact: ${getArtifactName(target)}. Details:`,
`Failed to load related data for artifact: ${ArtifactHelpers.getName(target)}. Details:`,
error,
);
this.setState({
Expand Down
26 changes: 15 additions & 11 deletions frontend/src/mlmd/MlmdUtils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { Workflow, WorkflowSpec, WorkflowStatus } from 'third_party/argo-ui/argo
testBestPractices();

const WORKFLOW_NAME = 'run-st448';
const RUN_ID = 'abcdefghijk';
const WORKFLOW_EMPTY: Workflow = {
metadata: {
name: WORKFLOW_NAME,
Expand All @@ -40,8 +41,8 @@ const WORKFLOW_EMPTY: Workflow = {
};

const V2_CONTEXT = new Context();
V2_CONTEXT.setName(WORKFLOW_NAME);
V2_CONTEXT.setType('kfp.PipelineRun');
V2_CONTEXT.setName(RUN_ID);
V2_CONTEXT.setType('system.PipelineRun');

const TFX_CONTEXT = new Context();
TFX_CONTEXT.setName('run.run-st448');
Expand All @@ -55,34 +56,37 @@ describe('MlmdUtils', () => {
describe('getRunContext', () => {
it('gets KFP v2 context', async () => {
mockGetContextByTypeAndName([V2_CONTEXT]);
const context = await getRunContext({
...WORKFLOW_EMPTY,
metadata: {
...WORKFLOW_EMPTY.metadata,
annotations: { 'pipelines.kubeflow.org/v2_pipeline': 'true' },
const context = await getRunContext(
{
...WORKFLOW_EMPTY,
metadata: {
...WORKFLOW_EMPTY.metadata,
annotations: { 'pipelines.kubeflow.org/v2_pipeline': 'true' },
},
},
});
RUN_ID,
);
expect(context).toEqual(V2_CONTEXT);
});

it('gets TFX context', async () => {
mockGetContextByTypeAndName([TFX_CONTEXT, V1_CONTEXT]);
const context = await getRunContext(WORKFLOW_EMPTY);
const context = await getRunContext(WORKFLOW_EMPTY, RUN_ID);
expect(context).toEqual(TFX_CONTEXT);
});

it('gets KFP v1 context', async () => {
const verify = expectWarnings();
mockGetContextByTypeAndName([V1_CONTEXT]);
const context = await getRunContext(WORKFLOW_EMPTY);
const context = await getRunContext(WORKFLOW_EMPTY, RUN_ID);
expect(context).toEqual(V1_CONTEXT);
verify();
});

it('throws error when not found', async () => {
const verify = expectWarnings();
mockGetContextByTypeAndName([]);
await expect(getRunContext(WORKFLOW_EMPTY)).rejects.toThrow();
await expect(getRunContext(WORKFLOW_EMPTY, RUN_ID)).rejects.toThrow();
verify();
});
});
Expand Down
42 changes: 32 additions & 10 deletions frontend/src/mlmd/MlmdUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import { logger } from 'src/lib/Utils';
import { isV2Pipeline } from 'src/lib/v2/WorkflowUtils';
import {
Api,
ArtifactCustomProperties,
ArtifactProperties,
ExecutionCustomProperties,
ExecutionProperties,
getResourceProperty,
Expand Down Expand Up @@ -88,14 +90,15 @@ async function getKfpRunContext(argoWorkflowName: string): Promise<Context> {
return await getContext({ name: argoWorkflowName, type: 'KfpRun' });
}

async function getKfpV2RunContext(argoWorkflowName: string): Promise<Context> {
return await getContext({ name: argoWorkflowName, type: 'kfp.PipelineRun' });
async function getKfpV2RunContext(runID: string): Promise<Context> {
return await getContext({ name: runID, type: 'system.PipelineRun' });
}

export async function getRunContext(workflow: Workflow): Promise<Context> {
export async function getRunContext(workflow: Workflow, runID: string): Promise<Context> {
console.log(workflow, runID);
const workflowName = workflow?.metadata?.name || '';
if (isV2Pipeline(workflow)) {
return await getKfpV2RunContext(workflowName);
return await getKfpV2RunContext(runID);
}
try {
return await getTfxRunContext(workflowName);
Expand Down Expand Up @@ -127,7 +130,12 @@ export async function getExecutionsFromContext(context: Context): Promise<Execut
}

export enum KfpExecutionProperties {
// kfp_pod_name is kept for backward compatibility.
// KFP v1 and TFX logs kfp_pod_name property, but KFP v2 logs pod_name.
KFP_POD_NAME = 'kfp_pod_name',
POD_NAME = 'pod_name',
DISPLAY_NAME = 'display_name',
TASK_NAME = 'task_name',
}

const EXECUTION_PROPERTY_REPOS = [ExecutionProperties, ExecutionCustomProperties];
Expand All @@ -141,28 +149,42 @@ export const ExecutionHelpers = {
undefined
);
},
getName(execution: Execution): string | number | undefined {
return (
// TODO(Bobgy): move task_name to a const when ExecutionCustomProperties are moved back to this repo.
getStringProperty(execution, 'task_name', true) ||
getName(execution: Execution): string {
return `${getStringProperty(execution, KfpExecutionProperties.DISPLAY_NAME, true) ||
getStringProperty(execution, KfpExecutionProperties.TASK_NAME, true) ||
getStringProperty(execution, ExecutionProperties.NAME) ||
getStringProperty(execution, ExecutionProperties.COMPONENT_ID) ||
getStringProperty(execution, ExecutionCustomProperties.TASK_ID, true) ||
undefined
);
'(No name)'}`;
},
getState(execution: Execution): string | number | undefined {
return getStringProperty(execution, ExecutionProperties.STATE) || undefined;
},
getKfpPod(execution: Execution): string | number | undefined {
return (
getStringProperty(execution, KfpExecutionProperties.POD_NAME, true) ||
getStringProperty(execution, KfpExecutionProperties.KFP_POD_NAME) ||
getStringProperty(execution, KfpExecutionProperties.KFP_POD_NAME, true) ||
undefined
);
},
};

export enum KfpArtifactProperties {
DISPLAY_NAME = 'display_name',
}

export const ArtifactHelpers = {
getName(a: Artifact): string {
const name =
getResourceProperty(a, KfpArtifactProperties.DISPLAY_NAME, true) ||
getResourceProperty(a, ArtifactProperties.NAME) ||
getResourceProperty(a, ArtifactCustomProperties.NAME, true) ||
'(No name)';
return `${name}`;
},
};

function getStringProperty(
resource: Artifact | Execution,
propertyName: string,
Expand Down
23 changes: 3 additions & 20 deletions frontend/src/mlmd/Utils.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import { ArtifactTypeMap } from './LineageApi';
import { Artifact, Execution, Value } from 'src/third_party/mlmd';
import { LineageTypedResource } from './LineageTypes';
import { Struct } from 'google-protobuf/google/protobuf/struct_pb';
import { ArtifactHelpers, ExecutionHelpers } from './MlmdUtils';

const UNNAMED_RESOURCE_DISPLAY_NAME = '(unnamed)';
const ARTIFACT_FIELD_REPOS = [ArtifactProperties, ArtifactCustomProperties];
const EXECUTION_FIELD_REPOS = [ExecutionProperties, ExecutionCustomProperties];

Expand Down Expand Up @@ -67,27 +67,10 @@ export function getResourcePropertyViaFallBack(
return prop as string;
}

export function getArtifactName(artifact: Artifact): string {
return (
getResourcePropertyViaFallBack(artifact, ARTIFACT_FIELD_REPOS, ['NAME']) ||
UNNAMED_RESOURCE_DISPLAY_NAME
);
}

function getExecutionName(execution: Execution): string {
return (
getResourcePropertyViaFallBack(execution, EXECUTION_FIELD_REPOS, [
'COMPONENT_ID',
'TASK_ID',
'NAME',
]) || UNNAMED_RESOURCE_DISPLAY_NAME
);
}

export function getResourceName(typedResource: LineageTypedResource): string {
return typedResource.type === 'artifact'
? getArtifactName(typedResource.resource)
: getExecutionName(typedResource.resource);
? ArtifactHelpers.getName(typedResource.resource)
: ExecutionHelpers.getName(typedResource.resource);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ exports[`LineageActionBar Adds the artifact to the history state and DOM when pu
key="breadcrumb-1"
onClick={[Function]}
>
(unnamed)
(No name)
</button>
</div>
<div>
Expand Down
9 changes: 3 additions & 6 deletions frontend/src/pages/ArtifactDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import {
Api,
ArtifactCustomProperties,
ArtifactProperties,
getResourceProperty,
LineageResource,
Expand All @@ -39,6 +38,7 @@ import { ToolbarProps } from '../components/Toolbar';
import { commonCss, padding } from '../Css';
import { logger, serviceErrorToString, titleCase } from '../lib/Utils';
import { Page, PageProps } from './Page';
import { ArtifactHelpers } from 'src/mlmd/MlmdUtils';

export enum ArtifactDetailsTab {
OVERVIEW = 0,
Expand Down Expand Up @@ -157,7 +157,7 @@ class ArtifactDetails extends Page<{}, ArtifactDetailsState> {
return {
actions: {},
breadcrumbs: [{ displayName: 'Artifacts', href: RoutePage.ARTIFACTS }],
pageTitle: `Artifact #${this.id} details`,
pageTitle: `Artifact #${this.id}`,
};
}

Expand Down Expand Up @@ -185,10 +185,7 @@ class ArtifactDetails extends Page<{}, ArtifactDetailsState> {
const typeResponse = await this.api.metadataStoreService.getArtifactTypesByID(typeRequest);
const artifactType = typeResponse.getArtifactTypesList()[0] || undefined;

const artifactName =
getResourceProperty(artifact, ArtifactProperties.NAME) ||
getResourceProperty(artifact, ArtifactCustomProperties.NAME, true);
let title = artifactName ? artifactName.toString() : '';
let title = ArtifactHelpers.getName(artifact);
const version = getResourceProperty(artifact, ArtifactProperties.VERSION);
if (version) {
title += ` (version: ${version})`;
Expand Down
17 changes: 4 additions & 13 deletions frontend/src/pages/ExecutionDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,8 @@
import { CircularProgress } from '@material-ui/core';
import React, { Component } from 'react';
import { Link } from 'react-router-dom';
import { getArtifactName, getLinkedArtifactsByEvents } from 'src/mlmd/MlmdUtils';
import {
Api,
ExecutionCustomProperties,
ExecutionProperties,
getArtifactTypes,
getResourceProperty,
} from 'src/mlmd/library';
import { ExecutionHelpers, getArtifactName, getLinkedArtifactsByEvents } from 'src/mlmd/MlmdUtils';
import { Api, getArtifactTypes } from 'src/mlmd/library';
import {
ArtifactType,
Event,
Expand Down Expand Up @@ -78,7 +72,7 @@ export default class ExecutionDetails extends Page<{}, ExecutionDetailsState> {
return {
actions: {},
breadcrumbs: [{ displayName: 'Executions', href: RoutePage.EXECUTIONS }],
pageTitle: `${this.id} details`,
pageTitle: `Execution #${this.id}`,
};
}

Expand Down Expand Up @@ -209,10 +203,7 @@ export class ExecutionDetailsContent extends Component<
}

const execution = executionResponse.getExecutionsList()[0];
const executionName =
getResourceProperty(execution, ExecutionProperties.COMPONENT_ID) ||
getResourceProperty(execution, ExecutionCustomProperties.TASK_ID, true);
this.props.onTitleUpdate(executionName ? executionName.toString() : '');
this.props.onTitleUpdate(ExecutionHelpers.getName(execution));

const typeRequest = new GetExecutionTypesByIDRequest();
typeRequest.setTypeIdsList([execution.getTypeId()]);
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/pages/RunDetails.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ class RunDetails extends Page<RunDetailsInternalProps, RunDetailsState> {
let mlmdExecutions: Execution[] | undefined;
// Get data about this workflow from MLMD
try {
mlmdRunContext = await getRunContext(workflow);
mlmdRunContext = await getRunContext(workflow, runId);
mlmdExecutions = await getExecutionsFromContext(mlmdRunContext);
} catch (err) {
// Data in MLMD may not exist depending on this pipeline is a TFX pipeline.
Expand Down
11 changes: 5 additions & 6 deletions samples/core/exit_handler/exit_handler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,29 @@
from .exit_handler import pipeline_exit_handler
from ...test.util import run_pipeline_func, TestCase, KfpMlmdClient

def verify(
argo_workflow_name: str, mlmd_connection_config, run: kfp_server_api.ApiRun,
**kwargs
):

def verify(mlmd_connection_config, run: kfp_server_api.ApiRun, **kwargs):
t = unittest.TestCase()
t.maxDiff = None # we always want to see full diff

t.assertEqual(run.status, 'Succeeded')

# Verify MLMD state
client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config)
tasks = client.get_tasks(argo_workflow_name=argo_workflow_name)
tasks = client.get_tasks(run_id=run.id)
task_names = [*tasks.keys()]
t.assertEqual(task_names, ['echo-msg', 'print-file', 'download-from-gcs'])

for task in task_names:
pprint(f'======= {task} =======')
pprint(tasks.get(task).get_dict())

t.assertEqual(
tasks.get('echo-msg').inputs.parameters.get('msg'),
'exit!',
)


# %%

if __name__ == '__main__':
Expand Down
Loading