From 0db0f3c64d609365991ee39a4501f6ab4f13cfa5 Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Thu, 15 Jul 2021 14:13:21 +0000 Subject: [PATCH 01/17] chore(v2): standardize MLMD data model --- .../kfp/compiler/_default_transformers.py | 13 +++- sdk/python/kfp/compiler/v2_compat.py | 12 ++- v2/cmd/launch/main.go | 22 ++++-- v2/component/launcher.go | 76 +++++++++++-------- v2/metadata/client.go | 41 +++++----- 5 files changed, 103 insertions(+), 61 deletions(-) diff --git a/sdk/python/kfp/compiler/_default_transformers.py b/sdk/python/kfp/compiler/_default_transformers.py index c20163e1ca6..01e9930add5 100644 --- a/sdk/python/kfp/compiler/_default_transformers.py +++ b/sdk/python/kfp/compiler/_default_transformers.py @@ -41,6 +41,11 @@ def add_kfp_pod_env(op: BaseOp) -> BaseOp: value_from=k8s_client.V1EnvVarSource( field_ref=k8s_client.V1ObjectFieldSelector( field_path='metadata.name'))) + ).add_env_variable( + k8s_client.V1EnvVar(name='KFP_POD_UID', + value_from=k8s_client.V1EnvVarSource( + field_ref=k8s_client.V1ObjectFieldSelector( + field_path='metadata.uid'))) ).add_env_variable( k8s_client.V1EnvVar(name='KFP_NAMESPACE', value_from=k8s_client.V1EnvVarSource( @@ -48,7 +53,13 @@ def add_kfp_pod_env(op: BaseOp) -> BaseOp: field_path='metadata.namespace'))) ).add_env_variable( k8s_client.V1EnvVar( - name='WORKFLOW_ID', + name='KFP_RUN_ID', + value_from=k8s_client. + V1EnvVarSource(field_ref=k8s_client.V1ObjectFieldSelector( + field_path="metadata.labels['pipeline/runid']"))) + ).add_env_variable( + k8s_client.V1EnvVar( + name='WORKFLOW_NAME', value_from=k8s_client. V1EnvVarSource(field_ref=k8s_client.V1ObjectFieldSelector( field_path="metadata.labels['workflows.argoproj.io/workflow']"))) diff --git a/sdk/python/kfp/compiler/v2_compat.py b/sdk/python/kfp/compiler/v2_compat.py index 1e1601fd8c1..878557869ed 100644 --- a/sdk/python/kfp/compiler/v2_compat.py +++ b/sdk/python/kfp/compiler/v2_compat.py @@ -100,10 +100,16 @@ def update_op(op: dsl.ContainerOp, op.name, "--pipeline_name", pipeline_name, - "--pipeline_run_id", - "$(WORKFLOW_ID)", - "--pipeline_task_id", + "--run_id", + "$(KFP_RUN_ID)", + "--run_resource", + "workflows.argoproj.io/$(WORKFLOW_NAME)", + "--namespace", + "$(KFP_NAMESPACE)", + "--pod_name", "$(KFP_POD_NAME)", + "--pod_uid", + "$(KFP_POD_UID)", "--pipeline_root", pipeline_root, "--enable_caching", diff --git a/v2/cmd/launch/main.go b/v2/cmd/launch/main.go index fcd4ff59074..11049128be3 100644 --- a/v2/cmd/launch/main.go +++ b/v2/cmd/launch/main.go @@ -26,15 +26,18 @@ var ( mlmdServerAddress = flag.String("mlmd_server_address", "", "The MLMD gRPC server address.") mlmdServerPort = flag.String("mlmd_server_port", "8080", "The MLMD gRPC server port.") runtimeInfoJSON = flag.String("runtime_info_json", "", "The JSON-encoded RuntimeInfo dictionary.") - containerImage = flag.String("container_image", "", "The current container image name.") + image = flag.String("image", "", "The current container image name.") taskName = flag.String("task_name", "", "The current task name.") pipelineName = flag.String("pipeline_name", "", "The current pipeline name.") - pipelineRunID = flag.String("pipeline_run_id", "", "The current pipeline run ID.") - pipelineTaskID = flag.String("pipeline_task_id", "", "The current pipeline task ID.") + runID = flag.String("run_id", "", "The current pipeline run ID.") + runResource = flag.String("run_resource", "", "The current pipeline's corresponding Kubernetes resource. e.g. workflows.argoproj.io/workflow-name") + namespace = flag.String("namespace", "", "The Kubernetes namespace this Pod belongs to.") + podName = flag.String("pod_name", "", "Kubernetes Pod name.") + podUID = flag.String("pod_uid", "", "Kubernetes Pod UID.") pipelineRoot = flag.String("pipeline_root", "", "The root output directory in which to store output artifacts.") // Use flag.String instead of flag.Bool here to avoid breaking the logic of parser(parseArgs(flag.Args(), rt) in launcher component - // With flag.Bool, the value of enable_caching will be included in flag.Args() which will break the parser logic(https://pkg.go.dev/flag#hdr-Command_line_flag_syntax) - enableCaching = flag.String( "enable_caching", "false", "Enable caching or not") + // With flag.Bool, "--enable_caching true" is not valid syntax (https://pkg.go.dev/flag#hdr-Command_line_flag_syntax) + enableCaching = flag.String("enable_caching", "false", "Enable caching or not") ) func main() { @@ -48,11 +51,14 @@ func main() { opts := &component.LauncherOptions{ PipelineName: *pipelineName, - PipelineRunID: *pipelineRunID, - PipelineTaskID: *pipelineTaskID, + RunID: *runID, + RunResource: *runResource, + Namespace: *namespace, + PodName: *podName, + PodUID: *podUID, PipelineRoot: *pipelineRoot, TaskName: *taskName, - ContainerImage: *containerImage, + Image: *image, MLMDServerAddress: *mlmdServerAddress, MLMDServerPort: *mlmdServerPort, EnableCaching: enableCachingBool, diff --git a/v2/component/launcher.go b/v2/component/launcher.go index b8af7d34d4f..acd6d5f630d 100644 --- a/v2/component/launcher.go +++ b/v2/component/launcher.go @@ -64,7 +64,6 @@ type Launcher struct { cacheClient *cacheutils.Client bucketConfig *bucketConfig k8sClient *kubernetes.Clientset - namespace string cmdArgs []string } @@ -72,10 +71,13 @@ type Launcher struct { type LauncherOptions struct { PipelineName string PipelineRoot string - PipelineRunID string - PipelineTaskID string + RunID string + RunResource string + Namespace string + PodName string + PodUID string TaskName string - ContainerImage string + Image string MLMDServerAddress string MLMDServerPort string EnableCaching bool @@ -155,11 +157,20 @@ func (o *LauncherOptions) validate() error { if empty(o.PipelineName) { return err("PipelineName") } - if empty(o.PipelineRunID) { - return err("PipelineRunID") + if empty(o.RunID) { + return err("RunID") } - if empty(o.PipelineTaskID) { - return err("PipelineTaskID") + if empty(o.RunResource) { + return err("RunResource") + } + if empty(o.Namespace) { + return err("Namespace") + } + if empty(o.PodName) { + return err("PodName") + } + if empty(o.PodUID) { + return err("PodUID") } if empty(o.PipelineRoot) { return err("PipelineRoot") @@ -192,14 +203,13 @@ func NewLauncher(runtimeInfo string, options *LauncherOptions) (*Launcher, error if err != nil { return nil, fmt.Errorf("Failed to initialize kubernetes client set: %w", err) } - namespace := os.Getenv("KFP_NAMESPACE") - if namespace == "" { - return nil, fmt.Errorf("Env variable 'KFP_NAMESPACE' is empty") + if err := options.validate(); err != nil { + return nil, err } if len(options.PipelineRoot) == 0 { options.PipelineRoot = defaultPipelineRoot - config, err := getLauncherConfig(k8sClient, namespace) + config, err := getLauncherConfig(k8sClient, options.Namespace) if err != nil { return nil, err } @@ -213,9 +223,6 @@ func NewLauncher(runtimeInfo string, options *LauncherOptions) (*Launcher, error } glog.Infof("PipelineRoot defaults to %q.", options.PipelineRoot) } - if err := options.validate(); err != nil { - return nil, err - } bc, err := parseBucketConfig(options.PipelineRoot) if err != nil { @@ -252,7 +259,6 @@ func NewLauncher(runtimeInfo string, options *LauncherOptions) (*Launcher, error cacheClient: cacheClient, bucketConfig: bc, k8sClient: k8sClient, - namespace: namespace, cmdArgs: cmdArgs, }, nil } @@ -276,16 +282,21 @@ func (l *Launcher) executeWithoutCacheEnabled(ctx context.Context, executorInput cmd := l.cmdArgs[0] args := make([]string, len(l.cmdArgs)-1) _ = copy(args, l.cmdArgs[1:]) - pipeline, err := l.metadataClient.GetPipeline(ctx, l.options.PipelineName, l.options.PipelineRunID) + pipeline, err := l.metadataClient.GetPipeline(ctx, l.options.PipelineName, l.options.RunID, l.options.Namespace, l.options.RunResource) if err != nil { - return fmt.Errorf("unable to get pipeline with PipelineName %q PipelineRunID %q: %w", l.options.PipelineName, l.options.PipelineRunID, err) + return fmt.Errorf("unable to get pipeline with PipelineName %q PipelineRunID %q: %w", l.options.PipelineName, l.options.RunID, err) } ecfg, err := metadata.GenerateExecutionConfig(executorInput) if err != nil { return fmt.Errorf("failed to generate execution config: %w", err) } - execution, err := l.metadataClient.CreateExecution(ctx, pipeline, l.options.TaskName, l.options.PipelineTaskID, l.options.ContainerImage, ecfg) + ecfg.Image = l.options.Image + ecfg.Namespace = l.options.Namespace + ecfg.PodName = l.options.PodName + ecfg.PodUID = l.options.PodUID + ecfg.TaskName = l.options.TaskName + execution, err := l.metadataClient.CreateExecution(ctx, pipeline, ecfg) if err != nil { return fmt.Errorf("unable to create execution: %w", err) } @@ -301,7 +312,7 @@ func (l *Launcher) executeWithCacheEnabled(ctx context.Context, executorInput *p for outputParamName, outputParam := range l.runtimeInfo.OutputParameters { outputParametersTypeMap[outputParamName] = outputParam.Type } - cacheKey, err := cacheutils.GenerateCacheKey(executorInput.GetInputs(), executorInput.GetOutputs(), outputParametersTypeMap, l.cmdArgs, l.options.ContainerImage) + cacheKey, err := cacheutils.GenerateCacheKey(executorInput.GetInputs(), executorInput.GetOutputs(), outputParametersTypeMap, l.cmdArgs, l.options.Image) if err != nil { return fmt.Errorf("failure while generating CacheKey: %w", err) } @@ -311,16 +322,22 @@ func (l *Launcher) executeWithCacheEnabled(ctx context.Context, executorInput *p return fmt.Errorf("failure while getting executionCache: %w", err) } - pipeline, err := l.metadataClient.GetPipeline(ctx, l.options.PipelineName, l.options.PipelineRunID) + pipeline, err := l.metadataClient.GetPipeline(ctx, l.options.PipelineName, l.options.RunID, l.options.Namespace, l.options.RunResource) if err != nil { - return fmt.Errorf("unable to get pipeline with PipelineName %q PipelineRunID %q: %w", l.options.PipelineName, l.options.PipelineRunID, err) + return fmt.Errorf("unable to get pipeline with PipelineName %q PipelineRunID %q: %w", l.options.PipelineName, l.options.RunID, err) } ecfg, err := metadata.GenerateExecutionConfig(executorInput) if err != nil { return fmt.Errorf("failed to generate execution config: %w", err) } - execution, err := l.metadataClient.CreateExecution(ctx, pipeline, l.options.TaskName, l.options.PipelineTaskID, l.options.ContainerImage, ecfg) + ecfg.Image = l.options.Image + ecfg.Namespace = l.options.Namespace + ecfg.PodName = l.options.PodName + ecfg.PodUID = l.options.PodUID + ecfg.TaskName = l.options.TaskName + // TODO(capri-xiyue): what should cached execution's metadata look like? + execution, err := l.metadataClient.CreateExecution(ctx, pipeline, ecfg) if err != nil { return fmt.Errorf("unable to create execution: %w", err) } @@ -464,20 +481,15 @@ func (l *Launcher) executeWithoutCacheHit(ctx context.Context, executorInput *pi if id == nil { return fmt.Errorf("failed to get id from createdExecution") } - pod, err := l.k8sClient.CoreV1().Pods(l.namespace).Get(ctx, l.options.PipelineTaskID, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get pod %v from namespace %v: %w", l.options.PipelineTaskID, l.namespace, err) - } - pipelineRunUUID := pod.GetObjectMeta().GetLabels()["pipeline/runid"] task := &api.Task{ PipelineName: l.options.PipelineName, - RunId: pipelineRunUUID, + RunId: l.options.RunID, MlmdExecutionID: strconv.FormatInt(*id, 10), CreatedAt: ×tamp.Timestamp{Seconds: executedStartedTime}, FinishedAt: ×tamp.Timestamp{Seconds: time.Now().Unix()}, Fingerprint: fingerPrint, } - err = l.cacheClient.CreateExecutionCache(ctx, task) + err := l.cacheClient.CreateExecutionCache(ctx, task) if err != nil { return fmt.Errorf("failed to create cache entry: %w", err) } @@ -668,7 +680,7 @@ func (l *Launcher) execute(ctx context.Context, executorInput *pipelinespec.Exec } func (l *Launcher) generateOutputURI(name string) string { - blobKey := path.Join(l.options.PipelineName, l.options.PipelineRunID, l.options.TaskName, name) + blobKey := path.Join(l.options.PipelineName, l.options.RunID, l.options.TaskName, name) return l.bucketConfig.uriFromKey(blobKey) } @@ -983,7 +995,7 @@ func getExecutorOutput() (*pipelinespec.ExecutorOutput, error) { func (l *Launcher) openBucket() (*blob.Bucket, error) { if l.bucketConfig.scheme == "minio://" { - cred, err := objectstore.GetMinioCredential(l.k8sClient, l.namespace) + cred, err := objectstore.GetMinioCredential(l.k8sClient, l.options.Namespace) if err != nil { return nil, fmt.Errorf("Failed to get minio credential: %w", err) } diff --git a/v2/metadata/client.go b/v2/metadata/client.go index cef091634dd..4c5d84354e5 100644 --- a/v2/metadata/client.go +++ b/v2/metadata/client.go @@ -93,8 +93,9 @@ type Parameters struct { // ExecutionConfig represents the input parameters and artifacts to an Execution. type ExecutionConfig struct { - InputParameters *Parameters - InputArtifactIDs map[string][]int64 + InputParameters *Parameters + InputArtifactIDs map[string][]int64 + TaskName, PodName, PodUID, Namespace, Image string } // InputArtifact is a wrapper around an MLMD artifact used as component inputs. @@ -124,13 +125,16 @@ type Execution struct { // GetPipeline returns the current pipeline represented by the specified // pipeline name and run ID. -func (c *Client) GetPipeline(ctx context.Context, pipelineName string, pipelineRunID string) (*Pipeline, error) { - pipelineContext, err := getOrInsertContext(ctx, c.svc, pipelineName, pipelineContextType) +func (c *Client) GetPipeline(ctx context.Context, pipelineName, pipelineRunID, namespace, runResource string) (*Pipeline, error) { + pipelineContext, err := getOrInsertContext(ctx, c.svc, pipelineName, pipelineContextType, nil) if err != nil { return nil, err } - - pipelineRunContext, err := getOrInsertContext(ctx, c.svc, pipelineRunID, pipelineRunContextType) + runMetadata := map[string]*pb.Value{ + "namespace": stringValue(namespace), + "resource_name": stringValue(runResource), + } + pipelineRunContext, err := getOrInsertContext(ctx, c.svc, pipelineRunID, pipelineRunContextType, runMetadata) if err != nil { return nil, err } @@ -252,7 +256,7 @@ func (c *Client) PublishExecution(ctx context.Context, execution *Execution, out } // CreateExecution creates a new MLMD execution under the specified Pipeline. -func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, taskName, taskID, containerImage string, config *ExecutionConfig) (*Execution, error) { +func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, config *ExecutionConfig) (*Execution, error) { typeID, err := c.getContainerExecutionTypeID(ctx) if err != nil { return nil, err @@ -261,11 +265,13 @@ func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, taskNa e := &pb.Execution{ TypeId: &typeID, CustomProperties: map[string]*pb.Value{ - "task_name": stringValue(taskName), - "pipeline_name": stringValue(*pipeline.pipelineCtx.Name), - "pipeline_run_id": stringValue(*pipeline.pipelineRunCtx.Name), - "kfp_pod_name": stringValue(taskID), - "container_image": stringValue(containerImage), + // We should support overriding display name in the future, for now it defaults to task name. + "display_name": stringValue(config.TaskName), + "task_name": stringValue(config.TaskName), + "pod_name": stringValue(config.PodName), + "pod_uid": stringValue(config.PodUID), + "namespace": stringValue(config.Namespace), + "image": stringValue(config.ContainerImage), }, LastKnownState: pb.Execution_RUNNING.Enum(), } @@ -334,7 +340,7 @@ func (c *Client) GetExecutions(ctx context.Context, ids []int64) ([]*pb.Executio } // GetEventsByArtifactIDs ... -func (c *Client) GetEventsByArtifactIDs (ctx context.Context, artifactIds []int64) ([]*pb.Event, error) { +func (c *Client) GetEventsByArtifactIDs(ctx context.Context, artifactIds []int64) ([]*pb.Event, error) { req := &pb.GetEventsByArtifactIDsRequest{ArtifactIds: artifactIds} res, err := c.svc.GetEventsByArtifactIDs(ctx, req) if err != nil { @@ -343,7 +349,7 @@ func (c *Client) GetEventsByArtifactIDs (ctx context.Context, artifactIds []int6 return res.Events, nil } -func (c* Client) GetArtifactName(ctx context.Context, artifactId int64)(string, error) { +func (c *Client) GetArtifactName(ctx context.Context, artifactId int64) (string, error) { mlmdEvents, err := c.GetEventsByArtifactIDs(ctx, []int64{artifactId}) if err != nil { return "", fmt.Errorf("faild when getting events with artifact id %v: %w", artifactId, err) @@ -460,7 +466,7 @@ func (e *Execution) GetID() *int64 { return e.execution.Id } -func getOrInsertContext(ctx context.Context, svc pb.MetadataStoreServiceClient, name string, contextType *pb.ContextType) (*pb.Context, error) { +func getOrInsertContext(ctx context.Context, svc pb.MetadataStoreServiceClient, name string, contextType *pb.ContextType, customProps map[string]*pb.Value) (*pb.Context, error) { // The most common case -- the context is already created by upstream tasks. // So we try to get the context first. getCtxRes, err := svc.GetContextByTypeAndName(ctx, &pb.GetContextByTypeAndNameRequest{TypeName: contextType.Name, ContextName: proto.String(name)}) @@ -496,8 +502,9 @@ func getOrInsertContext(ctx context.Context, svc pb.MetadataStoreServiceClient, putReq := &pb.PutContextsRequest{ Contexts: []*pb.Context{ { - Name: proto.String(name), - TypeId: proto.Int64(typeID), + Name: proto.String(name), + TypeId: proto.Int64(typeID), + CustomProperties: customProps, }, }, } From 6c465af4d7feea899d0d32d1e38c3b4fcfa92506 Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Thu, 15 Jul 2021 14:16:07 +0000 Subject: [PATCH 02/17] change context type to system namespace --- v2/metadata/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/v2/metadata/client.go b/v2/metadata/client.go index 4c5d84354e5..c6c19fd50c7 100644 --- a/v2/metadata/client.go +++ b/v2/metadata/client.go @@ -36,9 +36,9 @@ import ( ) const ( - pipelineContextTypeName = "kfp.Pipeline" - pipelineRunContextTypeName = "kfp.PipelineRun" - containerExecutionTypeName = "kfp.ContainerExecution" + pipelineContextTypeName = "system.Pipeline" + pipelineRunContextTypeName = "system.PipelineRun" + containerExecutionTypeName = "system.ContainerExecution" mlmdClientSideMaxRetries = 3 ) From 40e560bfd610c6ac776a866d1a753ebf568a1604 Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 03:25:27 +0000 Subject: [PATCH 03/17] update sdk snapshots --- .../v2_compatible_two_step_pipeline.yaml | 31 ++++++++++++++----- ...wo_step_pipeline_with_custom_launcher.yaml | 31 ++++++++++++++----- 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml index a7fb2c46b4e..e2476c22d2f 100644 --- a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml +++ b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml @@ -4,7 +4,7 @@ metadata: generateName: my-test-pipeline- annotations: pipelines.kubeflow.org/kfp_sdk_version: 1.6.4 - pipelines.kubeflow.org/pipeline_compilation_time: '2021-07-14T10:49:23.340805' + pipelines.kubeflow.org/pipeline_compilation_time: '2021-07-16T03:24:43.490043' pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "gs://output-directory/v2-artifacts", "name": "pipeline-output-directory"}, {"default": "pipeline/my-test-pipeline", "name": "pipeline-name"}], "name": "my-test-pipeline"}' @@ -81,17 +81,24 @@ spec: command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, preprocess, --pipeline_name, - '{{inputs.parameters.pipeline-name}}', --pipeline_run_id, $(WORKFLOW_ID), - --pipeline_task_id, $(KFP_POD_NAME), --pipeline_root, '{{inputs.parameters.pipeline-output-directory}}', + '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource, + workflows.argoproj.io/$(WORKFLOW_NAME), --namespace, $(KFP_NAMESPACE), --pod_name, + $(KFP_POD_NAME), --pod_uid, $(KFP_POD_UID), --pipeline_root, '{{inputs.parameters.pipeline-output-directory}}', --enable_caching, $(ENABLE_CACHING), --, some_int=12, uri=uri-to-import, --] env: - name: KFP_POD_NAME valueFrom: fieldRef: {fieldPath: metadata.name} + - name: KFP_POD_UID + valueFrom: + fieldRef: {fieldPath: metadata.uid} - name: KFP_NAMESPACE valueFrom: fieldRef: {fieldPath: metadata.namespace} - - name: WORKFLOW_ID + - name: KFP_RUN_ID + valueFrom: + fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'} + - name: WORKFLOW_NAME valueFrom: fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'} - name: ENABLE_CACHING @@ -179,17 +186,25 @@ spec: command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, train, --pipeline_name, '{{inputs.parameters.pipeline-name}}', - --pipeline_run_id, $(WORKFLOW_ID), --pipeline_task_id, $(KFP_POD_NAME), --pipeline_root, - '{{inputs.parameters.pipeline-output-directory}}', --enable_caching, $(ENABLE_CACHING), - --, 'num_steps={{inputs.parameters.preprocess-output_parameter_one}}', --] + --run_id, $(KFP_RUN_ID), --run_resource, workflows.argoproj.io/$(WORKFLOW_NAME), + --namespace, $(KFP_NAMESPACE), --pod_name, $(KFP_POD_NAME), --pod_uid, $(KFP_POD_UID), + --pipeline_root, '{{inputs.parameters.pipeline-output-directory}}', --enable_caching, + $(ENABLE_CACHING), --, 'num_steps={{inputs.parameters.preprocess-output_parameter_one}}', + --] env: - name: KFP_POD_NAME valueFrom: fieldRef: {fieldPath: metadata.name} + - name: KFP_POD_UID + valueFrom: + fieldRef: {fieldPath: metadata.uid} - name: KFP_NAMESPACE valueFrom: fieldRef: {fieldPath: metadata.namespace} - - name: WORKFLOW_ID + - name: KFP_RUN_ID + valueFrom: + fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'} + - name: WORKFLOW_NAME valueFrom: fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'} - name: ENABLE_CACHING diff --git a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml index ba0d0b36d33..088b6f7c9ee 100644 --- a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml +++ b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml @@ -4,7 +4,7 @@ metadata: generateName: my-test-pipeline-with-custom-launcher- annotations: pipelines.kubeflow.org/kfp_sdk_version: 1.6.4 - pipelines.kubeflow.org/pipeline_compilation_time: '2021-07-14T10:49:23.033713' + pipelines.kubeflow.org/pipeline_compilation_time: '2021-07-16T03:24:43.155152' pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "gs://output-directory/v2-artifacts", "name": "pipeline-output-directory"}, {"default": "pipeline/my-test-pipeline-with-custom-launcher", "name": "pipeline-name"}], "name": "my-test-pipeline-with-custom-launcher"}' @@ -81,17 +81,24 @@ spec: command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, preprocess, --pipeline_name, - '{{inputs.parameters.pipeline-name}}', --pipeline_run_id, $(WORKFLOW_ID), - --pipeline_task_id, $(KFP_POD_NAME), --pipeline_root, '{{inputs.parameters.pipeline-output-directory}}', + '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource, + workflows.argoproj.io/$(WORKFLOW_NAME), --namespace, $(KFP_NAMESPACE), --pod_name, + $(KFP_POD_NAME), --pod_uid, $(KFP_POD_UID), --pipeline_root, '{{inputs.parameters.pipeline-output-directory}}', --enable_caching, $(ENABLE_CACHING), --, some_int=12, uri=uri-to-import, --] env: - name: KFP_POD_NAME valueFrom: fieldRef: {fieldPath: metadata.name} + - name: KFP_POD_UID + valueFrom: + fieldRef: {fieldPath: metadata.uid} - name: KFP_NAMESPACE valueFrom: fieldRef: {fieldPath: metadata.namespace} - - name: WORKFLOW_ID + - name: KFP_RUN_ID + valueFrom: + fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'} + - name: WORKFLOW_NAME valueFrom: fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'} - name: ENABLE_CACHING @@ -179,17 +186,25 @@ spec: command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, train, --pipeline_name, '{{inputs.parameters.pipeline-name}}', - --pipeline_run_id, $(WORKFLOW_ID), --pipeline_task_id, $(KFP_POD_NAME), --pipeline_root, - '{{inputs.parameters.pipeline-output-directory}}', --enable_caching, $(ENABLE_CACHING), - --, 'num_steps={{inputs.parameters.preprocess-output_parameter_one}}', --] + --run_id, $(KFP_RUN_ID), --run_resource, workflows.argoproj.io/$(WORKFLOW_NAME), + --namespace, $(KFP_NAMESPACE), --pod_name, $(KFP_POD_NAME), --pod_uid, $(KFP_POD_UID), + --pipeline_root, '{{inputs.parameters.pipeline-output-directory}}', --enable_caching, + $(ENABLE_CACHING), --, 'num_steps={{inputs.parameters.preprocess-output_parameter_one}}', + --] env: - name: KFP_POD_NAME valueFrom: fieldRef: {fieldPath: metadata.name} + - name: KFP_POD_UID + valueFrom: + fieldRef: {fieldPath: metadata.uid} - name: KFP_NAMESPACE valueFrom: fieldRef: {fieldPath: metadata.namespace} - - name: WORKFLOW_ID + - name: KFP_RUN_ID + valueFrom: + fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'} + - name: WORKFLOW_NAME valueFrom: fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'} - name: ENABLE_CACHING From 3de3c4df217b7fb8d0aaa049d113b5f2d7d24366 Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 03:38:09 +0000 Subject: [PATCH 04/17] fix go v2 tests --- v2/metadata/client.go | 2 +- v2/metadata/client_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/v2/metadata/client.go b/v2/metadata/client.go index c6c19fd50c7..d305b7086c0 100644 --- a/v2/metadata/client.go +++ b/v2/metadata/client.go @@ -271,7 +271,7 @@ func (c *Client) CreateExecution(ctx context.Context, pipeline *Pipeline, config "pod_name": stringValue(config.PodName), "pod_uid": stringValue(config.PodUID), "namespace": stringValue(config.Namespace), - "image": stringValue(config.ContainerImage), + "image": stringValue(config.Image), }, LastKnownState: pb.Execution_RUNNING.Enum(), } diff --git a/v2/metadata/client_test.go b/v2/metadata/client_test.go index 1c7102dcb09..7a37ec56a85 100644 --- a/v2/metadata/client_test.go +++ b/v2/metadata/client_test.go @@ -84,9 +84,9 @@ func Test_GetPipeline(t *testing.T) { mlmdClient, err := NewTestMlmdClient() fatalIf(err) - _, err = client.GetPipeline(ctx, "get-pipeline-test", runId) + _, err = client.GetPipeline(ctx, "get-pipeline-test", runId, "kubeflow", "workflows.argoproj.io/hello-world-abcd") fatalIf(err) - runCtxType := "kfp.PipelineRun" + runCtxType := "system.PipelineRun" pipeline := "get-pipeline-test" res, err := mlmdClient.GetContextByTypeAndName(ctx, &pb.GetContextByTypeAndNameRequest{ @@ -130,7 +130,7 @@ func Test_GetPipelineConcurrently(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText) + _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, "kubeflow", "workflows.argoproj.io/hello-world-"+runIdText) if err != nil { t.Error(err) } @@ -142,7 +142,7 @@ func Test_GetPipelineConcurrently(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText) + _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, "kubeflow", "workflows.argoproj.io/hello-world-"+runIdText) if err != nil { t.Error(err) } From 0a0583e4c2b3d47665a6cbc4a4bba89b145e3bb8 Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 03:55:51 +0000 Subject: [PATCH 05/17] update --- sdk/python/kfp/compiler/_default_transformers.py | 8 ++++---- sdk/python/kfp/compiler/v2_compat.py | 2 +- sdk/python/tests/compiler/testdata/add_pod_env.yaml | 8 ++++++++ v2/cmd/launch/main.go | 2 +- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/sdk/python/kfp/compiler/_default_transformers.py b/sdk/python/kfp/compiler/_default_transformers.py index 01e9930add5..62f767eac08 100644 --- a/sdk/python/kfp/compiler/_default_transformers.py +++ b/sdk/python/kfp/compiler/_default_transformers.py @@ -53,16 +53,16 @@ def add_kfp_pod_env(op: BaseOp) -> BaseOp: field_path='metadata.namespace'))) ).add_env_variable( k8s_client.V1EnvVar( - name='KFP_RUN_ID', + name='WORKFLOW_ID', value_from=k8s_client. V1EnvVarSource(field_ref=k8s_client.V1ObjectFieldSelector( - field_path="metadata.labels['pipeline/runid']"))) + field_path="metadata.labels['workflows.argoproj.io/workflow']"))) ).add_env_variable( k8s_client.V1EnvVar( - name='WORKFLOW_NAME', + name='KFP_RUN_ID', value_from=k8s_client. V1EnvVarSource(field_ref=k8s_client.V1ObjectFieldSelector( - field_path="metadata.labels['workflows.argoproj.io/workflow']"))) + field_path="metadata.labels['pipeline/runid']"))) ).add_env_variable( k8s_client.V1EnvVar( name='ENABLE_CACHING', diff --git a/sdk/python/kfp/compiler/v2_compat.py b/sdk/python/kfp/compiler/v2_compat.py index 878557869ed..9dc0b48d04d 100644 --- a/sdk/python/kfp/compiler/v2_compat.py +++ b/sdk/python/kfp/compiler/v2_compat.py @@ -103,7 +103,7 @@ def update_op(op: dsl.ContainerOp, "--run_id", "$(KFP_RUN_ID)", "--run_resource", - "workflows.argoproj.io/$(WORKFLOW_NAME)", + "workflows.argoproj.io/$(WORKFLOW_ID)", "--namespace", "$(KFP_NAMESPACE)", "--pod_name", diff --git a/sdk/python/tests/compiler/testdata/add_pod_env.yaml b/sdk/python/tests/compiler/testdata/add_pod_env.yaml index 94b8a853f48..4bce1948a2f 100644 --- a/sdk/python/tests/compiler/testdata/add_pod_env.yaml +++ b/sdk/python/tests/compiler/testdata/add_pod_env.yaml @@ -21,6 +21,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.name + - name: KFP_POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid - name: KFP_NAMESPACE valueFrom: fieldRef: @@ -29,6 +33,10 @@ spec: valueFrom: fieldRef: fieldPath: metadata.labels['workflows.argoproj.io/workflow'] + - name: KFP_RUN_ID + valueFrom: + fieldRef: + fieldPath: metadata.labels['pipeline/runid'] - name: ENABLE_CACHING valueFrom: fieldRef: diff --git a/v2/cmd/launch/main.go b/v2/cmd/launch/main.go index 11049128be3..3f813428a29 100644 --- a/v2/cmd/launch/main.go +++ b/v2/cmd/launch/main.go @@ -26,7 +26,7 @@ var ( mlmdServerAddress = flag.String("mlmd_server_address", "", "The MLMD gRPC server address.") mlmdServerPort = flag.String("mlmd_server_port", "8080", "The MLMD gRPC server port.") runtimeInfoJSON = flag.String("runtime_info_json", "", "The JSON-encoded RuntimeInfo dictionary.") - image = flag.String("image", "", "The current container image name.") + image = flag.String("container_image", "", "The current container image name.") taskName = flag.String("task_name", "", "The current task name.") pipelineName = flag.String("pipeline_name", "", "The current pipeline name.") runID = flag.String("run_id", "", "The current pipeline run ID.") From 92da08459909dc687c21e710f1f2a6a264f7841a Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 04:06:25 +0000 Subject: [PATCH 06/17] update v2 compat snapshots --- .../v2_compatible_two_step_pipeline.yaml | 18 +++++++++--------- ...two_step_pipeline_with_custom_launcher.yaml | 18 +++++++++--------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml index e2476c22d2f..4fdf0ba9439 100644 --- a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml +++ b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline.yaml @@ -4,7 +4,7 @@ metadata: generateName: my-test-pipeline- annotations: pipelines.kubeflow.org/kfp_sdk_version: 1.6.4 - pipelines.kubeflow.org/pipeline_compilation_time: '2021-07-16T03:24:43.490043' + pipelines.kubeflow.org/pipeline_compilation_time: '2021-07-16T04:06:08.476218' pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "gs://output-directory/v2-artifacts", "name": "pipeline-output-directory"}, {"default": "pipeline/my-test-pipeline", "name": "pipeline-name"}], "name": "my-test-pipeline"}' @@ -82,7 +82,7 @@ spec: --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, preprocess, --pipeline_name, '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource, - workflows.argoproj.io/$(WORKFLOW_NAME), --namespace, $(KFP_NAMESPACE), --pod_name, + workflows.argoproj.io/$(WORKFLOW_ID), --namespace, $(KFP_NAMESPACE), --pod_name, $(KFP_POD_NAME), --pod_uid, $(KFP_POD_UID), --pipeline_root, '{{inputs.parameters.pipeline-output-directory}}', --enable_caching, $(ENABLE_CACHING), --, some_int=12, uri=uri-to-import, --] env: @@ -95,12 +95,12 @@ spec: - name: KFP_NAMESPACE valueFrom: fieldRef: {fieldPath: metadata.namespace} + - name: WORKFLOW_ID + valueFrom: + fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'} - name: KFP_RUN_ID valueFrom: fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'} - - name: WORKFLOW_NAME - valueFrom: - fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'} - name: ENABLE_CACHING valueFrom: fieldRef: {fieldPath: 'metadata.labels[''pipelines.kubeflow.org/enable_caching'']'} @@ -186,7 +186,7 @@ spec: command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, train, --pipeline_name, '{{inputs.parameters.pipeline-name}}', - --run_id, $(KFP_RUN_ID), --run_resource, workflows.argoproj.io/$(WORKFLOW_NAME), + --run_id, $(KFP_RUN_ID), --run_resource, workflows.argoproj.io/$(WORKFLOW_ID), --namespace, $(KFP_NAMESPACE), --pod_name, $(KFP_POD_NAME), --pod_uid, $(KFP_POD_UID), --pipeline_root, '{{inputs.parameters.pipeline-output-directory}}', --enable_caching, $(ENABLE_CACHING), --, 'num_steps={{inputs.parameters.preprocess-output_parameter_one}}', @@ -201,12 +201,12 @@ spec: - name: KFP_NAMESPACE valueFrom: fieldRef: {fieldPath: metadata.namespace} + - name: WORKFLOW_ID + valueFrom: + fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'} - name: KFP_RUN_ID valueFrom: fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'} - - name: WORKFLOW_NAME - valueFrom: - fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'} - name: ENABLE_CACHING valueFrom: fieldRef: {fieldPath: 'metadata.labels[''pipelines.kubeflow.org/enable_caching'']'} diff --git a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml index 088b6f7c9ee..309ee7c8566 100644 --- a/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml +++ b/sdk/python/kfp/compiler/testdata/v2_compatible_two_step_pipeline_with_custom_launcher.yaml @@ -4,7 +4,7 @@ metadata: generateName: my-test-pipeline-with-custom-launcher- annotations: pipelines.kubeflow.org/kfp_sdk_version: 1.6.4 - pipelines.kubeflow.org/pipeline_compilation_time: '2021-07-16T03:24:43.155152' + pipelines.kubeflow.org/pipeline_compilation_time: '2021-07-16T04:06:08.051103' pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "gs://output-directory/v2-artifacts", "name": "pipeline-output-directory"}, {"default": "pipeline/my-test-pipeline-with-custom-launcher", "name": "pipeline-name"}], "name": "my-test-pipeline-with-custom-launcher"}' @@ -82,7 +82,7 @@ spec: --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, preprocess, --pipeline_name, '{{inputs.parameters.pipeline-name}}', --run_id, $(KFP_RUN_ID), --run_resource, - workflows.argoproj.io/$(WORKFLOW_NAME), --namespace, $(KFP_NAMESPACE), --pod_name, + workflows.argoproj.io/$(WORKFLOW_ID), --namespace, $(KFP_NAMESPACE), --pod_name, $(KFP_POD_NAME), --pod_uid, $(KFP_POD_UID), --pipeline_root, '{{inputs.parameters.pipeline-output-directory}}', --enable_caching, $(ENABLE_CACHING), --, some_int=12, uri=uri-to-import, --] env: @@ -95,12 +95,12 @@ spec: - name: KFP_NAMESPACE valueFrom: fieldRef: {fieldPath: metadata.namespace} + - name: WORKFLOW_ID + valueFrom: + fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'} - name: KFP_RUN_ID valueFrom: fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'} - - name: WORKFLOW_NAME - valueFrom: - fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'} - name: ENABLE_CACHING valueFrom: fieldRef: {fieldPath: 'metadata.labels[''pipelines.kubeflow.org/enable_caching'']'} @@ -186,7 +186,7 @@ spec: command: [/kfp-launcher/launch, --mlmd_server_address, $(METADATA_GRPC_SERVICE_HOST), --mlmd_server_port, $(METADATA_GRPC_SERVICE_PORT), --runtime_info_json, $(KFP_V2_RUNTIME_INFO), --container_image, $(KFP_V2_IMAGE), --task_name, train, --pipeline_name, '{{inputs.parameters.pipeline-name}}', - --run_id, $(KFP_RUN_ID), --run_resource, workflows.argoproj.io/$(WORKFLOW_NAME), + --run_id, $(KFP_RUN_ID), --run_resource, workflows.argoproj.io/$(WORKFLOW_ID), --namespace, $(KFP_NAMESPACE), --pod_name, $(KFP_POD_NAME), --pod_uid, $(KFP_POD_UID), --pipeline_root, '{{inputs.parameters.pipeline-output-directory}}', --enable_caching, $(ENABLE_CACHING), --, 'num_steps={{inputs.parameters.preprocess-output_parameter_one}}', @@ -201,12 +201,12 @@ spec: - name: KFP_NAMESPACE valueFrom: fieldRef: {fieldPath: metadata.namespace} + - name: WORKFLOW_ID + valueFrom: + fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'} - name: KFP_RUN_ID valueFrom: fieldRef: {fieldPath: 'metadata.labels[''pipeline/runid'']'} - - name: WORKFLOW_NAME - valueFrom: - fieldRef: {fieldPath: 'metadata.labels[''workflows.argoproj.io/workflow'']'} - name: ENABLE_CACHING valueFrom: fieldRef: {fieldPath: 'metadata.labels[''pipelines.kubeflow.org/enable_caching'']'} From 4f7792177d53c017697c8b9a8a99a21210740311 Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 05:17:26 +0000 Subject: [PATCH 07/17] fix all samples --- .../core/exit_handler/exit_handler_test.py | 11 ++-- .../visualization/confusion_matrix_test.py | 40 +++++++------- samples/core/visualization/roc_test.py | 40 +++++++------- .../visualization/tensorboard_minio_test.py | 7 +-- samples/test/cache_v2_test.py | 52 ++++++++++++------- ...eight_python_functions_v2_pipeline_test.py | 11 ++-- ...t_python_functions_v2_with_outputs_test.py | 7 +-- samples/test/metrics_visualization_v2_test.py | 13 ++--- samples/test/reused_component_test.py | 4 +- samples/test/two_step_test.py | 31 ++++------- samples/test/util.py | 13 ++--- 11 files changed, 114 insertions(+), 115 deletions(-) diff --git a/samples/core/exit_handler/exit_handler_test.py b/samples/core/exit_handler/exit_handler_test.py index 886b24c6af5..c2218408c0e 100644 --- a/samples/core/exit_handler/exit_handler_test.py +++ b/samples/core/exit_handler/exit_handler_test.py @@ -23,10 +23,8 @@ from .exit_handler import pipeline_exit_handler from ...test.util import run_pipeline_func, TestCase, KfpMlmdClient -def verify( - argo_workflow_name: str, mlmd_connection_config, run: kfp_server_api.ApiRun, - **kwargs -): + +def verify(mlmd_connection_config, run: kfp_server_api.ApiRun, **kwargs): t = unittest.TestCase() t.maxDiff = None # we always want to see full diff @@ -34,19 +32,20 @@ def verify( # Verify MLMD state client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) - tasks = client.get_tasks(argo_workflow_name=argo_workflow_name) + tasks = client.get_tasks(run_id=run.id) task_names = [*tasks.keys()] t.assertEqual(task_names, ['echo-msg', 'print-file', 'download-from-gcs']) for task in task_names: pprint(f'======= {task} =======') pprint(tasks.get(task).get_dict()) - + t.assertEqual( tasks.get('echo-msg').inputs.parameters.get('msg'), 'exit!', ) + # %% if __name__ == '__main__': diff --git a/samples/core/visualization/confusion_matrix_test.py b/samples/core/visualization/confusion_matrix_test.py index 8f367a9e629..cc319449f02 100644 --- a/samples/core/visualization/confusion_matrix_test.py +++ b/samples/core/visualization/confusion_matrix_test.py @@ -21,32 +21,36 @@ import kfp -def verify( - run: kfp_server_api.ApiRun, mlmd_connection_config, argo_workflow_name: str, - **kwargs -): +def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): t = unittest.TestCase() t.maxDiff = None # we always want to see full diff t.assertEqual(run.status, 'Succeeded') client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) - tasks = client.get_tasks(argo_workflow_name=argo_workflow_name) + tasks = client.get_tasks(run_id=run.id) pprint(tasks) confusion_visualization = tasks['confusion-visualization'] output = [ - a for a in confusion_visualization.outputs.artifacts if a.name == 'mlpipeline_ui_metadata' + a for a in confusion_visualization.outputs.artifacts + if a.name == 'mlpipeline_ui_metadata' ][0] pprint(output) - t.assertEqual(confusion_visualization.get_dict()['outputs']['artifacts'][0]['name'], - 'mlpipeline_ui_metadata') - - -run_pipeline_func([TestCase(pipeline_func=confusion_matrix_pipeline, - verify_func=verify, - mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE, - arguments={ - }), - TestCase(pipeline_func=confusion_matrix_pipeline, - mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY - )]) + t.assertEqual( + confusion_visualization.get_dict()['outputs']['artifacts'][0]['name'], + 'mlpipeline_ui_metadata' + ) + + +run_pipeline_func([ + TestCase( + pipeline_func=confusion_matrix_pipeline, + verify_func=verify, + mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE, + arguments={} + ), + TestCase( + pipeline_func=confusion_matrix_pipeline, + mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY + ) +]) diff --git a/samples/core/visualization/roc_test.py b/samples/core/visualization/roc_test.py index c4efb0c3359..33f48e98500 100644 --- a/samples/core/visualization/roc_test.py +++ b/samples/core/visualization/roc_test.py @@ -21,32 +21,36 @@ import kfp -def verify( - run: kfp_server_api.ApiRun, mlmd_connection_config, argo_workflow_name: str, - **kwargs -): +def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): t = unittest.TestCase() t.maxDiff = None # we always want to see full diff t.assertEqual(run.status, 'Succeeded') client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) - tasks = client.get_tasks(argo_workflow_name=argo_workflow_name) + tasks = client.get_tasks(run_id=run.id) pprint(tasks) roc_visualization = tasks['roc-visualization'] output = [ - a for a in roc_visualization.outputs.artifacts if a.name == 'mlpipeline_ui_metadata' + a for a in roc_visualization.outputs.artifacts + if a.name == 'mlpipeline_ui_metadata' ][0] pprint(output) - t.assertEqual(roc_visualization.get_dict()['outputs']['artifacts'][0]['name'], - 'mlpipeline_ui_metadata') - - -run_pipeline_func([TestCase(pipeline_func=roc_curve_pipeline, - verify_func=verify, - mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE, - arguments={ - }), - TestCase(pipeline_func=roc_curve_pipeline, - mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY - )]) + t.assertEqual( + roc_visualization.get_dict()['outputs']['artifacts'][0]['name'], + 'mlpipeline_ui_metadata' + ) + + +run_pipeline_func([ + TestCase( + pipeline_func=roc_curve_pipeline, + verify_func=verify, + mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE, + arguments={} + ), + TestCase( + pipeline_func=roc_curve_pipeline, + mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY + ) +]) diff --git a/samples/core/visualization/tensorboard_minio_test.py b/samples/core/visualization/tensorboard_minio_test.py index c1eda517bcd..a662eec4d79 100644 --- a/samples/core/visualization/tensorboard_minio_test.py +++ b/samples/core/visualization/tensorboard_minio_test.py @@ -23,16 +23,13 @@ from ...test.util import run_pipeline_func, TestCase, KfpMlmdClient -def verify( - run: kfp_server_api.ApiRun, mlmd_connection_config, argo_workflow_name: str, - **kwargs -): +def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): t = unittest.TestCase() t.maxDiff = None # we always want to see full diff t.assertEqual(run.status, 'Succeeded') client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) - tasks = client.get_tasks(argo_workflow_name=argo_workflow_name) + tasks = client.get_tasks(run_id=run.id) # uncomment to debug # pprint(tasks) vis = tasks['create-tensorboard-visualization'] diff --git a/samples/test/cache_v2_test.py b/samples/test/cache_v2_test.py index 49dfb1eb7aa..063d58b7c8a 100644 --- a/samples/test/cache_v2_test.py +++ b/samples/test/cache_v2_test.py @@ -21,7 +21,6 @@ import string import unittest import functools -import time import kfp import kfp_server_api @@ -31,14 +30,10 @@ from ml_metadata.proto import Execution -def get_tasks(mlmd_connection_config, argo_workflow_name: str): - # Verify MLMD state - client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) - tasks = client.get_tasks(argo_workflow_name=argo_workflow_name) - return tasks - - -def verify_tasks(t: unittest.TestCase, tasks: dict[str, KfpTask], task_state, uri: str, some_int:int): +def verify_tasks( + t: unittest.TestCase, tasks: dict[str, KfpTask], task_state, uri: str, + some_int: int +): task_names = [*tasks.keys()] t.assertEqual(task_names, ['train-op', 'preprocess'], 'task names') @@ -61,10 +56,10 @@ def verify_tasks(t: unittest.TestCase, tasks: dict[str, KfpTask], task_state, ur 'type': 'system.Dataset' }], 'parameters': { - 'output_parameter_one':some_int + 'output_parameter_one': some_int } }, - 'type': 'kfp.ContainerExecution', + 'type': 'system.ContainerExecution', 'state': task_state, }, preprocess.get_dict()) t.assertEqual({ @@ -87,22 +82,23 @@ def verify_tasks(t: unittest.TestCase, tasks: dict[str, KfpTask], task_state, ur }], 'parameters': {} }, - 'type': 'kfp.ContainerExecution', + 'type': 'system.ContainerExecution', 'state': task_state, }, train.get_dict()) def verify( - run: kfp_server_api.ApiRun, mlmd_connection_config, argo_workflow_name: str, - uri: str, some_int, state: int, - **kwargs + run: kfp_server_api.ApiRun, mlmd_connection_config, uri: str, some_int, + state: int, **kwargs ): t = unittest.TestCase() t.maxDiff = None # we always want to see full diff t.assertEqual(run.status, 'Succeeded') - tasks = get_tasks(mlmd_connection_config, argo_workflow_name) + client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) + tasks = client.get_tasks(run_id=run.id) verify_tasks(t, tasks, state, uri, some_int) + if __name__ == '__main__': letters = string.ascii_lowercase random_uri = 'http://' + ''.join(random.choice(letters) for i in range(5)) @@ -110,8 +106,16 @@ def verify( run_pipeline_func([ TestCase( pipeline_func=two_step_pipeline, - arguments={'uri': f'{random_uri}', 'some_int': f'{random_int}'}, - verify_func=functools.partial(verify, uri=random_uri, some_int=random_int, state = Execution.State.COMPLETE,), + arguments={ + 'uri': f'{random_uri}', + 'some_int': f'{random_int}' + }, + verify_func=functools.partial( + verify, + uri=random_uri, + some_int=random_int, + state=Execution.State.COMPLETE, + ), mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE, enable_caching=True ), @@ -119,8 +123,16 @@ def verify( run_pipeline_func([ TestCase( pipeline_func=two_step_pipeline, - arguments={'uri': f'{random_uri}', 'some_int': f'{random_int}'}, - verify_func=functools.partial(verify, uri=random_uri, some_int=random_int, state = Execution.State.CACHED), + arguments={ + 'uri': f'{random_uri}', + 'some_int': f'{random_int}' + }, + verify_func=functools.partial( + verify, + uri=random_uri, + some_int=random_int, + state=Execution.State.CACHED + ), mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE, enable_caching=True ), diff --git a/samples/test/lightweight_python_functions_v2_pipeline_test.py b/samples/test/lightweight_python_functions_v2_pipeline_test.py index fc74c8fdda3..1067e9c50fb 100644 --- a/samples/test/lightweight_python_functions_v2_pipeline_test.py +++ b/samples/test/lightweight_python_functions_v2_pipeline_test.py @@ -22,15 +22,12 @@ from ml_metadata.proto import Execution -def verify( - run: kfp_server_api.ApiRun, mlmd_connection_config, argo_workflow_name: str, - **kwargs -): +def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): t = unittest.TestCase() t.maxDiff = None # we always want to see full diff t.assertEqual(run.status, 'Succeeded') client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) - tasks = client.get_tasks(argo_workflow_name=argo_workflow_name) + tasks = client.get_tasks(run_id=run.id) task_names = [*tasks.keys()] t.assertCountEqual(task_names, ['preprocess', 'train'], 'task names') @@ -66,7 +63,7 @@ def verify( 'output_parameter': 'message' } }, - 'type': 'kfp.ContainerExecution', + 'type': 'system.ContainerExecution', 'state': Execution.State.COMPLETE, }, preprocess.get_dict(), @@ -104,7 +101,7 @@ def verify( }], 'parameters': {} }, - 'type': 'kfp.ContainerExecution', + 'type': 'system.ContainerExecution', 'state': Execution.State.COMPLETE, }, train.get_dict(), diff --git a/samples/test/lightweight_python_functions_v2_with_outputs_test.py b/samples/test/lightweight_python_functions_v2_with_outputs_test.py index f682b453884..1ca05523a5c 100644 --- a/samples/test/lightweight_python_functions_v2_with_outputs_test.py +++ b/samples/test/lightweight_python_functions_v2_with_outputs_test.py @@ -23,15 +23,12 @@ from .util import KfpMlmdClient, run_pipeline_func, TestCase -def verify( - run: kfp_server_api.ApiRun, mlmd_connection_config, argo_workflow_name: str, - **kwargs -): +def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): t = unittest.TestCase() t.maxDiff = None # we always want to see full diff t.assertEqual(run.status, 'Succeeded') client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) - tasks = client.get_tasks(argo_workflow_name=argo_workflow_name) + tasks = client.get_tasks(run_id=run.id) pprint(tasks) output_artifact = tasks['output-artifact'] diff --git a/samples/test/metrics_visualization_v2_test.py b/samples/test/metrics_visualization_v2_test.py index ffe80ed532a..cce57014cd2 100644 --- a/samples/test/metrics_visualization_v2_test.py +++ b/samples/test/metrics_visualization_v2_test.py @@ -23,15 +23,12 @@ from ml_metadata.proto import Execution -def verify( - run: kfp_server_api.ApiRun, mlmd_connection_config, argo_workflow_name: str, - **kwargs -): +def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): t = unittest.TestCase() t.maxDiff = None # we always want to see full diff t.assertEqual(run.status, 'Succeeded') client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) - tasks = client.get_tasks(argo_workflow_name=argo_workflow_name) + tasks = client.get_tasks(run_id=run.id) task_names = [*tasks.keys()] t.assertCountEqual( @@ -115,7 +112,7 @@ def verify( }], 'parameters': {} }, - 'type': 'kfp.ContainerExecution', + 'type': 'system.ContainerExecution', 'state': Execution.State.COMPLETE, }, wine_classification.get_dict()) t.assertEqual( @@ -152,7 +149,7 @@ def verify( }], 'parameters': {} }, - 'type': 'kfp.ContainerExecution', + 'type': 'system.ContainerExecution', 'state': Execution.State.COMPLETE, }, iris_sgdclassifier.get_dict() ) @@ -183,7 +180,7 @@ def verify( }], 'parameters': {} }, - 'type': 'kfp.ContainerExecution', + 'type': 'system.ContainerExecution', 'state': Execution.State.COMPLETE, }, digit_classification.get_dict()) diff --git a/samples/test/reused_component_test.py b/samples/test/reused_component_test.py index 9418b58b479..2e4ebfdf322 100644 --- a/samples/test/reused_component_test.py +++ b/samples/test/reused_component_test.py @@ -19,14 +19,14 @@ from .util import run_pipeline_func, TestCase, KfpMlmdClient -def verify(run, argo_workflow_name: str, mlmd_connection_config, **kwargs): +def verify(run, mlmd_connection_config, **kwargs): t = unittest.TestCase() t.maxDiff = None # we always want to see full diff t.assertEqual(run.status, 'Succeeded') client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) - tasks = client.get_tasks(argo_workflow_name=argo_workflow_name) + tasks = client.get_tasks(run_id=run.id) t.assertEqual( 17, tasks['add-3'].outputs.parameters['sum'], 'add result should be 17' ) diff --git a/samples/test/two_step_test.py b/samples/test/two_step_test.py index 8dcfbcff06a..3752acd1024 100644 --- a/samples/test/two_step_test.py +++ b/samples/test/two_step_test.py @@ -27,13 +27,6 @@ from ml_metadata.proto import Execution -def get_tasks(mlmd_connection_config, argo_workflow_name: str): - # Verify MLMD state - client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) - tasks = client.get_tasks(argo_workflow_name=argo_workflow_name) - return tasks - - def verify_tasks(t: unittest.TestCase, tasks: dict[str, KfpTask]): task_names = [*tasks.keys()] t.assertEqual(task_names, ['train-op', 'preprocess'], 'task names') @@ -66,7 +59,7 @@ def verify_tasks(t: unittest.TestCase, tasks: dict[str, KfpTask]): 'output_parameter_one': 1234 } }, - 'type': 'kfp.ContainerExecution', + 'type': 'system.ContainerExecution', 'state': Execution.State.COMPLETE, }, preprocess.get_dict()) t.assertEqual({ @@ -89,7 +82,7 @@ def verify_tasks(t: unittest.TestCase, tasks: dict[str, KfpTask]): }], 'parameters': {} }, - 'type': 'kfp.ContainerExecution', + 'type': 'system.ContainerExecution', 'state': Execution.State.COMPLETE, }, train.get_dict()) @@ -100,37 +93,35 @@ def verify_artifacts(t: unittest.TestCase, tasks: dict, artifact_uri_prefix): t.assertTrue(artifact.uri.startswith(artifact_uri_prefix)) -def verify( - run: kfp_server_api.ApiRun, mlmd_connection_config, argo_workflow_name: str, - **kwargs -): +def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): t = unittest.TestCase() t.maxDiff = None # we always want to see full diff t.assertEqual(run.status, 'Succeeded') - tasks = get_tasks(mlmd_connection_config, argo_workflow_name) + client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) + tasks = client.get_tasks(run_id=run.id) verify_tasks(t, tasks) def verify_with_default_pipeline_root( - run: kfp_server_api.ApiRun, mlmd_connection_config, argo_workflow_name: str, - **kwargs + run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs ): t = unittest.TestCase() t.maxDiff = None # we always want to see full diff t.assertEqual(run.status, 'Succeeded') - tasks = get_tasks(mlmd_connection_config, argo_workflow_name) + client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) + tasks = client.get_tasks(run_id=run.id) verify_tasks(t, tasks) verify_artifacts(t, tasks, 'minio://mlpipeline/v2/artifacts') def verify_with_specific_pipeline_root( - run: kfp_server_api.ApiRun, mlmd_connection_config, argo_workflow_name: str, - **kwargs + run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs ): t = unittest.TestCase() t.maxDiff = None # we always want to see full diff t.assertEqual(run.status, 'Succeeded') - tasks = get_tasks(mlmd_connection_config, argo_workflow_name) + client = KfpMlmdClient(mlmd_connection_config=mlmd_connection_config) + tasks = client.get_tasks(run_id=run.id) verify_tasks(t, tasks) verify_artifacts(t, tasks, 'minio://mlpipeline/override/artifacts') diff --git a/samples/test/util.py b/samples/test/util.py index 6efe9d0ac09..72e51f5c317 100644 --- a/samples/test/util.py +++ b/samples/test/util.py @@ -66,8 +66,9 @@ def run_pipeline_func(test_cases: List[TestCase]): """ def test_wrapper( - run_pipeline: Callable[[Callable, kfp.dsl.PipelineExecutionMode, bool, dict], - kfp_server_api.ApiRunDetail], + run_pipeline: Callable[ + [Callable, kfp.dsl.PipelineExecutionMode, bool, dict], + kfp_server_api.ApiRunDetail], mlmd_connection_config: metadata_store_pb2.MetadataStoreClientConfig, ): for case in test_cases: @@ -353,14 +354,14 @@ def __init__( ) self.mlmd_store = metadata_store.MetadataStore(mlmd_connection_config) - def get_tasks(self, argo_workflow_name: str): + def get_tasks(self, run_id: str): run_context = self.mlmd_store.get_context_by_type_and_name( - type_name='kfp.PipelineRun', - context_name=argo_workflow_name, + type_name='system.PipelineRun', + context_name=run_id, ) if not run_context: raise Exception( - f'Cannot find kfp.PipelineRun context "{argo_workflow_name}"' + f'Cannot find system.PipelineRun context "{run_id}"' ) logging.info( f'run_context: name={run_context.name} id={run_context.id}' From 142e0b810f8df049a5c0bea47141e3b091863542 Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 06:36:51 +0000 Subject: [PATCH 08/17] fix must specify pipeline root --- v2/component/launcher.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/v2/component/launcher.go b/v2/component/launcher.go index acd6d5f630d..0d9eaf5efa7 100644 --- a/v2/component/launcher.go +++ b/v2/component/launcher.go @@ -172,9 +172,6 @@ func (o *LauncherOptions) validate() error { if empty(o.PodUID) { return err("PodUID") } - if empty(o.PipelineRoot) { - return err("PipelineRoot") - } if empty(o.TaskName) { return err("TaskName") } From 0b7e7d345a22d344eaf022d05819035006ce0806 Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 07:37:51 +0000 Subject: [PATCH 09/17] add artifact display name --- v2/component/launcher.go | 2 +- v2/metadata/client.go | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/v2/component/launcher.go b/v2/component/launcher.go index 0d9eaf5efa7..c8e900fa733 100644 --- a/v2/component/launcher.go +++ b/v2/component/launcher.go @@ -605,7 +605,7 @@ func (l *Launcher) execute(ctx context.Context, executorInput *pipelinespec.Exec if err != nil { return fmt.Errorf("failed to determine schema for output %q: %w", name, err) } - mlmdArtifact, err = l.metadataClient.RecordArtifact(ctx, schema, mlmdArtifact, pb.Artifact_LIVE) + mlmdArtifact, err = l.metadataClient.RecordArtifact(ctx, name, schema, mlmdArtifact, pb.Artifact_LIVE) if err != nil { return metadataErr(err) } diff --git a/v2/metadata/client.go b/v2/metadata/client.go index d305b7086c0..927bebd2812 100644 --- a/v2/metadata/client.go +++ b/v2/metadata/client.go @@ -428,7 +428,7 @@ func SchemaToArtifactType(schema string) (*pb.ArtifactType, error) { } // RecordArtifact ... -func (c *Client) RecordArtifact(ctx context.Context, schema string, artifact *pb.Artifact, state pb.Artifact_State) (*pb.Artifact, error) { +func (c *Client) RecordArtifact(ctx context.Context, outputName string, schema string, artifact *pb.Artifact, state pb.Artifact_State) (*pb.Artifact, error) { at, err := SchemaToArtifactType(schema) if err != nil { return nil, err @@ -441,6 +441,13 @@ func (c *Client) RecordArtifact(ctx context.Context, schema string, artifact *pb artifact.TypeId = at.Id artifact.State = &state + if artifact.CustomProperties == nil { + artifact.CustomProperties = make(map[string]*pb.Value) + } + if _, ok := artifact.CustomProperties["display_name"]; !ok { + // display name default value + artifact.CustomProperties["display_name"] = stringValue(outputName) + } res, err := c.svc.PutArtifacts(ctx, &pb.PutArtifactsRequest{ Artifacts: []*pb.Artifact{artifact}, From e6b8eb8502b2870d249396e0d0c2a3bdfcf62cff Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 07:38:09 +0000 Subject: [PATCH 10/17] add UI rendering of new fields --- frontend/src/mlmd/MlmdUtils.ts | 26 +++++++++++++++---------- frontend/src/pages/ArtifactDetails.tsx | 2 +- frontend/src/pages/ExecutionDetails.tsx | 9 +++------ frontend/src/pages/RunDetails.tsx | 2 +- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/frontend/src/mlmd/MlmdUtils.ts b/frontend/src/mlmd/MlmdUtils.ts index 7c202104efc..8ab7edcbeab 100644 --- a/frontend/src/mlmd/MlmdUtils.ts +++ b/frontend/src/mlmd/MlmdUtils.ts @@ -88,14 +88,15 @@ async function getKfpRunContext(argoWorkflowName: string): Promise { return await getContext({ name: argoWorkflowName, type: 'KfpRun' }); } -async function getKfpV2RunContext(argoWorkflowName: string): Promise { - return await getContext({ name: argoWorkflowName, type: 'kfp.PipelineRun' }); +async function getKfpV2RunContext(runID: string): Promise { + return await getContext({ name: runID, type: 'system.PipelineRun' }); } -export async function getRunContext(workflow: Workflow): Promise { +export async function getRunContext(workflow: Workflow, runID: string): Promise { + console.log(workflow, runID); const workflowName = workflow?.metadata?.name || ''; if (isV2Pipeline(workflow)) { - return await getKfpV2RunContext(workflowName); + return await getKfpV2RunContext(runID); } try { return await getTfxRunContext(workflowName); @@ -127,7 +128,12 @@ export async function getExecutionsFromContext(context: Context): Promise { return { actions: {}, breadcrumbs: [{ displayName: 'Artifacts', href: RoutePage.ARTIFACTS }], - pageTitle: `Artifact #${this.id} details`, + pageTitle: `Artifact #${this.id}`, }; } diff --git a/frontend/src/pages/ExecutionDetails.tsx b/frontend/src/pages/ExecutionDetails.tsx index f88f57f6d3d..fe8ec0cb645 100644 --- a/frontend/src/pages/ExecutionDetails.tsx +++ b/frontend/src/pages/ExecutionDetails.tsx @@ -17,7 +17,7 @@ import { CircularProgress } from '@material-ui/core'; import React, { Component } from 'react'; import { Link } from 'react-router-dom'; -import { getArtifactName, getLinkedArtifactsByEvents } from 'src/mlmd/MlmdUtils'; +import { ExecutionHelpers, getArtifactName, getLinkedArtifactsByEvents } from 'src/mlmd/MlmdUtils'; import { Api, ExecutionCustomProperties, @@ -78,7 +78,7 @@ export default class ExecutionDetails extends Page<{}, ExecutionDetailsState> { return { actions: {}, breadcrumbs: [{ displayName: 'Executions', href: RoutePage.EXECUTIONS }], - pageTitle: `${this.id} details`, + pageTitle: `Execution #${this.id}`, }; } @@ -209,10 +209,7 @@ export class ExecutionDetailsContent extends Component< } const execution = executionResponse.getExecutionsList()[0]; - const executionName = - getResourceProperty(execution, ExecutionProperties.COMPONENT_ID) || - getResourceProperty(execution, ExecutionCustomProperties.TASK_ID, true); - this.props.onTitleUpdate(executionName ? executionName.toString() : ''); + this.props.onTitleUpdate(ExecutionHelpers.getName(execution)); const typeRequest = new GetExecutionTypesByIDRequest(); typeRequest.setTypeIdsList([execution.getTypeId()]); diff --git a/frontend/src/pages/RunDetails.tsx b/frontend/src/pages/RunDetails.tsx index 3d33d400915..4c203340cba 100644 --- a/frontend/src/pages/RunDetails.tsx +++ b/frontend/src/pages/RunDetails.tsx @@ -801,7 +801,7 @@ class RunDetails extends Page { let mlmdExecutions: Execution[] | undefined; // Get data about this workflow from MLMD try { - mlmdRunContext = await getRunContext(workflow); + mlmdRunContext = await getRunContext(workflow, runId); mlmdExecutions = await getExecutionsFromContext(mlmdRunContext); } catch (err) { // Data in MLMD may not exist depending on this pipeline is a TFX pipeline. From 132c4b730a6a3b3bd808c40415ebda6430bb2e00 Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 08:15:02 +0000 Subject: [PATCH 11/17] fix sample tests --- samples/test/cache_v2_test.py | 12 ++- ...eight_python_functions_v2_pipeline_test.py | 19 +++- samples/test/metrics_visualization_v2_test.py | 99 ++++++++++--------- samples/test/two_step_test.py | 12 ++- samples/test/util.py | 14 ++- 5 files changed, 95 insertions(+), 61 deletions(-) diff --git a/samples/test/cache_v2_test.py b/samples/test/cache_v2_test.py index 063d58b7c8a..907e46b88c9 100644 --- a/samples/test/cache_v2_test.py +++ b/samples/test/cache_v2_test.py @@ -51,7 +51,9 @@ def verify_tasks( }, 'outputs': { 'artifacts': [{ - 'metadata': {}, + 'metadata': { + 'display_name': 'output_dataset_one', + }, 'name': 'output_dataset_one', 'type': 'system.Dataset' }], @@ -66,7 +68,9 @@ def verify_tasks( 'name': 'train-op', 'inputs': { 'artifacts': [{ - 'metadata': {}, + 'metadata': { + 'display_name': 'output_dataset_one', + }, 'name': 'dataset', 'type': 'system.Dataset', }], @@ -76,7 +80,9 @@ def verify_tasks( }, 'outputs': { 'artifacts': [{ - 'metadata': {}, + 'metadata': { + 'display_name': 'model', + }, 'name': 'model', 'type': 'system.Model', }], diff --git a/samples/test/lightweight_python_functions_v2_pipeline_test.py b/samples/test/lightweight_python_functions_v2_pipeline_test.py index 1067e9c50fb..d2ce9e29a2c 100644 --- a/samples/test/lightweight_python_functions_v2_pipeline_test.py +++ b/samples/test/lightweight_python_functions_v2_pipeline_test.py @@ -48,11 +48,15 @@ def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): 'name': 'preprocess', 'outputs': { 'artifacts': [{ - 'metadata': {}, + 'metadata': { + 'display_name': 'output_dataset_one' + }, 'name': 'output_dataset_one', 'type': 'system.Dataset' }, { - 'metadata': {}, + 'metadata': { + 'display_name': 'output_dataset_two' + }, 'name': 'output_dataset_two', 'type': 'system.Dataset' }], @@ -72,11 +76,15 @@ def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): { 'inputs': { 'artifacts': [{ - 'metadata': {}, + 'metadata': { + 'display_name': 'output_dataset_one' + }, 'name': 'dataset_one', 'type': 'system.Dataset' }, { - 'metadata': {}, + 'metadata': { + 'display_name': 'output_dataset_two' + }, 'name': 'dataset_two', 'type': 'system.Dataset' }], @@ -92,8 +100,9 @@ def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): 'outputs': { 'artifacts': [{ 'metadata': { + 'display_name': 'model', 'accuracy': { - 'doubleValue': 0.9 + 'doubleValue': 0.9, } }, 'name': 'model', diff --git a/samples/test/metrics_visualization_v2_test.py b/samples/test/metrics_visualization_v2_test.py index cce57014cd2..09a4a3b0493 100644 --- a/samples/test/metrics_visualization_v2_test.py +++ b/samples/test/metrics_visualization_v2_test.py @@ -57,54 +57,53 @@ def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): 'outputs': { 'artifacts': [{ 'metadata': { + 'display_name': 'metrics', 'confidenceMetrics': { - 'structValue': { - 'list': [{ - 'confidenceThreshold': 2.0, - 'falsePositiveRate': 0.0, - 'recall': 0.0 - }, { - 'confidenceThreshold': 1.0, - 'falsePositiveRate': 0.0, - 'recall': 0.33962264150943394 - }, { - 'confidenceThreshold': 0.9, - 'falsePositiveRate': 0.0, - 'recall': 0.6037735849056604 - }, { - 'confidenceThreshold': 0.8, - 'falsePositiveRate': 0.0, - 'recall': 0.8490566037735849 - }, { - 'confidenceThreshold': 0.6, - 'falsePositiveRate': 0.0, - 'recall': 0.8867924528301887 - }, { - 'confidenceThreshold': 0.5, - 'falsePositiveRate': 0.0125, - 'recall': 0.9245283018867925 - }, { - 'confidenceThreshold': 0.4, - 'falsePositiveRate': 0.075, - 'recall': 0.9622641509433962 - }, { - 'confidenceThreshold': 0.3, - 'falsePositiveRate': 0.0875, - 'recall': 1.0 - }, { - 'confidenceThreshold': 0.2, - 'falsePositiveRate': 0.2375, - 'recall': 1.0 - }, { - 'confidenceThreshold': 0.1, - 'falsePositiveRate': 0.475, - 'recall': 1.0 - }, { - 'confidenceThreshold': 0.0, - 'falsePositiveRate': 1.0, - 'recall': 1.0 - }] - } + 'list': [{ + 'confidenceThreshold': 2.0, + 'falsePositiveRate': 0.0, + 'recall': 0.0 + }, { + 'confidenceThreshold': 1.0, + 'falsePositiveRate': 0.0, + 'recall': 0.33962264150943394 + }, { + 'confidenceThreshold': 0.9, + 'falsePositiveRate': 0.0, + 'recall': 0.6037735849056604 + }, { + 'confidenceThreshold': 0.8, + 'falsePositiveRate': 0.0, + 'recall': 0.8490566037735849 + }, { + 'confidenceThreshold': 0.6, + 'falsePositiveRate': 0.0, + 'recall': 0.8867924528301887 + }, { + 'confidenceThreshold': 0.5, + 'falsePositiveRate': 0.0125, + 'recall': 0.9245283018867925 + }, { + 'confidenceThreshold': 0.4, + 'falsePositiveRate': 0.075, + 'recall': 0.9622641509433962 + }, { + 'confidenceThreshold': 0.3, + 'falsePositiveRate': 0.0875, + 'recall': 1.0 + }, { + 'confidenceThreshold': 0.2, + 'falsePositiveRate': 0.2375, + 'recall': 1.0 + }, { + 'confidenceThreshold': 0.1, + 'falsePositiveRate': 0.475, + 'recall': 1.0 + }, { + 'confidenceThreshold': 0.0, + 'falsePositiveRate': 1.0, + 'recall': 1.0 + }] } }, 'name': 'metrics', @@ -127,7 +126,8 @@ def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): 'outputs': { 'artifacts': [{ 'metadata': { - 'confusionMatrix': {'structValue': {'struct': { + 'display_name': 'metrics', + 'confusionMatrix': {'struct': { 'annotationSpecs': [{ 'displayName': 'Setosa' }, { @@ -142,7 +142,7 @@ def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): }, { 'row': [mock.ANY, mock.ANY, mock.ANY] }] - }}} + }} }, 'name': 'metrics', 'type': 'system.ClassificationMetrics' @@ -171,6 +171,7 @@ def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): 'outputs': { 'artifacts': [{ 'metadata': { + 'display_name': 'metrics', 'accuracy': { 'doubleValue': 92.0 }, diff --git a/samples/test/two_step_test.py b/samples/test/two_step_test.py index 3752acd1024..fb135d76232 100644 --- a/samples/test/two_step_test.py +++ b/samples/test/two_step_test.py @@ -51,7 +51,9 @@ def verify_tasks(t: unittest.TestCase, tasks: dict[str, KfpTask]): }, 'outputs': { 'artifacts': [{ - 'metadata': {}, + 'metadata': { + 'display_name': 'output_dataset_one', + }, 'name': 'output_dataset_one', 'type': 'system.Dataset' }], @@ -66,7 +68,9 @@ def verify_tasks(t: unittest.TestCase, tasks: dict[str, KfpTask]): 'name': 'train-op', 'inputs': { 'artifacts': [{ - 'metadata': {}, + 'metadata': { + 'display_name': 'output_dataset_one', + }, 'name': 'dataset', 'type': 'system.Dataset', }], @@ -76,7 +80,9 @@ def verify_tasks(t: unittest.TestCase, tasks: dict[str, KfpTask]): }, 'outputs': { 'artifacts': [{ - 'metadata': {}, + 'metadata': { + 'display_name': 'model', + }, 'name': 'model', 'type': 'system.Model', }], diff --git a/samples/test/util.py b/samples/test/util.py index 72e51f5c317..3b477209c76 100644 --- a/samples/test/util.py +++ b/samples/test/util.py @@ -223,6 +223,18 @@ def _create_run(): fire.Fire(main) +def simplify_proto_struct(data: dict) -> dict: + res = {} + for key, value in data.items(): + if value.get('stringValue'): + res[key] = value['stringValue'] + elif value.get('structValue'): + res[key] = value['structValue'] + else: + res[key] = value + return res + + @dataclass class KfpArtifact: name: str @@ -242,7 +254,7 @@ def new( artifact_name = mlmd_event.path.steps[0].key # The original field is custom_properties, but MessageToDict converts it # to customProperties. - metadata = MessageToDict(mlmd_artifact).get('customProperties', {}) + metadata = simplify_proto_struct(MessageToDict(mlmd_artifact).get('customProperties', {})) return cls( name=artifact_name, type=mlmd_artifact_type.name, From 1f7545828a1dcf929a63b67d8f4117ecaf635fef Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 08:27:00 +0000 Subject: [PATCH 12/17] let ui read artifact and execution names consistently --- frontend/src/mlmd/LineageActionBar.tsx | 5 +++-- frontend/src/mlmd/LineageView.tsx | 5 +++-- frontend/src/mlmd/MlmdUtils.ts | 18 +++++++++++++++++- frontend/src/mlmd/Utils.tsx | 23 +++-------------------- frontend/src/pages/ArtifactDetails.tsx | 7 ++----- frontend/src/pages/ExecutionDetails.tsx | 8 +------- 6 files changed, 29 insertions(+), 37 deletions(-) diff --git a/frontend/src/mlmd/LineageActionBar.tsx b/frontend/src/mlmd/LineageActionBar.tsx index 1904dd2e8ac..7bf731fd329 100644 --- a/frontend/src/mlmd/LineageActionBar.tsx +++ b/frontend/src/mlmd/LineageActionBar.tsx @@ -22,9 +22,10 @@ import ReplayIcon from '@material-ui/icons/Replay'; import { classes, stylesheet } from 'typestyle'; import { color, commonCss, fonts, padding } from './Css'; import { CSSProperties } from 'typestyle/lib/types'; -import { getArtifactName, getResourcePropertyViaFallBack } from './Utils'; +import { getResourcePropertyViaFallBack } from './Utils'; import { Artifact } from 'src/third_party/mlmd'; import { ArtifactProperties, ArtifactCustomProperties } from './Api'; +import { ArtifactHelpers } from './MlmdUtils'; const baseLinkButton: CSSProperties = { backgroundColor: 'transparent', @@ -159,7 +160,7 @@ export class LineageActionBar extends React.Component< disabled={isActive} onClick={onBreadcrumbClicked} > - {getArtifactName(artifact)} + {ArtifactHelpers.getName(artifact)} , ); if (!isActive) { diff --git a/frontend/src/mlmd/LineageView.tsx b/frontend/src/mlmd/LineageView.tsx index 8379db0835b..f111cde4e0c 100644 --- a/frontend/src/mlmd/LineageView.tsx +++ b/frontend/src/mlmd/LineageView.tsx @@ -51,10 +51,11 @@ import { } from 'src/third_party/mlmd'; import { RefObject } from 'react'; import { getArtifactTypes, getExecutionTypes } from './LineageApi'; -import { getTypeName, getArtifactName } from './Utils'; +import { getTypeName } from './Utils'; import { Api } from './Api'; import { LineageResource } from './LineageTypes'; import CircularProgress from '@material-ui/core/CircularProgress'; +import { ArtifactHelpers } from './MlmdUtils'; const isInputEvent = (event: Event) => [Event.Type.INPUT.valueOf(), Event.Type.DECLARED_INPUT.valueOf()].includes(event.getType()); @@ -392,7 +393,7 @@ export class LineageView extends React.Component { console.error( - `Failed to load related data for artifact: ${getArtifactName(target)}. Details:`, + `Failed to load related data for artifact: ${ArtifactHelpers.getName(target)}. Details:`, error, ); this.setState({ diff --git a/frontend/src/mlmd/MlmdUtils.ts b/frontend/src/mlmd/MlmdUtils.ts index 8ab7edcbeab..218d0049f3c 100644 --- a/frontend/src/mlmd/MlmdUtils.ts +++ b/frontend/src/mlmd/MlmdUtils.ts @@ -19,6 +19,8 @@ import { logger } from 'src/lib/Utils'; import { isV2Pipeline } from 'src/lib/v2/WorkflowUtils'; import { Api, + ArtifactCustomProperties, + ArtifactProperties, ExecutionCustomProperties, ExecutionProperties, getResourceProperty, @@ -148,7 +150,6 @@ export const ExecutionHelpers = { ); }, getName(execution: Execution): string { - console.log(execution); return `${getStringProperty(execution, KfpExecutionProperties.DISPLAY_NAME, true) || getStringProperty(execution, KfpExecutionProperties.TASK_NAME, true) || getStringProperty(execution, ExecutionProperties.NAME) || @@ -169,6 +170,21 @@ export const ExecutionHelpers = { }, }; +export enum KfpArtifactProperties { + DISPLAY_NAME = 'display_name', +} + +export const ArtifactHelpers = { + getName(a: Artifact): string { + const name = + getResourceProperty(a, KfpArtifactProperties.DISPLAY_NAME, true) || + getResourceProperty(a, ArtifactProperties.NAME) || + getResourceProperty(a, ArtifactCustomProperties.NAME, true) || + '(No name)'; + return `${name}`; + }, +}; + function getStringProperty( resource: Artifact | Execution, propertyName: string, diff --git a/frontend/src/mlmd/Utils.tsx b/frontend/src/mlmd/Utils.tsx index d22dc1f7de4..43e4f2704c3 100644 --- a/frontend/src/mlmd/Utils.tsx +++ b/frontend/src/mlmd/Utils.tsx @@ -24,8 +24,8 @@ import { ArtifactTypeMap } from './LineageApi'; import { Artifact, Execution, Value } from 'src/third_party/mlmd'; import { LineageTypedResource } from './LineageTypes'; import { Struct } from 'google-protobuf/google/protobuf/struct_pb'; +import { ArtifactHelpers, ExecutionHelpers } from './MlmdUtils'; -const UNNAMED_RESOURCE_DISPLAY_NAME = '(unnamed)'; const ARTIFACT_FIELD_REPOS = [ArtifactProperties, ArtifactCustomProperties]; const EXECUTION_FIELD_REPOS = [ExecutionProperties, ExecutionCustomProperties]; @@ -67,27 +67,10 @@ export function getResourcePropertyViaFallBack( return prop as string; } -export function getArtifactName(artifact: Artifact): string { - return ( - getResourcePropertyViaFallBack(artifact, ARTIFACT_FIELD_REPOS, ['NAME']) || - UNNAMED_RESOURCE_DISPLAY_NAME - ); -} - -function getExecutionName(execution: Execution): string { - return ( - getResourcePropertyViaFallBack(execution, EXECUTION_FIELD_REPOS, [ - 'COMPONENT_ID', - 'TASK_ID', - 'NAME', - ]) || UNNAMED_RESOURCE_DISPLAY_NAME - ); -} - export function getResourceName(typedResource: LineageTypedResource): string { return typedResource.type === 'artifact' - ? getArtifactName(typedResource.resource) - : getExecutionName(typedResource.resource); + ? ArtifactHelpers.getName(typedResource.resource) + : ExecutionHelpers.getName(typedResource.resource); } /** diff --git a/frontend/src/pages/ArtifactDetails.tsx b/frontend/src/pages/ArtifactDetails.tsx index 4369af8d0f3..817fcbf03d8 100644 --- a/frontend/src/pages/ArtifactDetails.tsx +++ b/frontend/src/pages/ArtifactDetails.tsx @@ -16,7 +16,6 @@ import { Api, - ArtifactCustomProperties, ArtifactProperties, getResourceProperty, LineageResource, @@ -39,6 +38,7 @@ import { ToolbarProps } from '../components/Toolbar'; import { commonCss, padding } from '../Css'; import { logger, serviceErrorToString, titleCase } from '../lib/Utils'; import { Page, PageProps } from './Page'; +import { ArtifactHelpers } from 'src/mlmd/MlmdUtils'; export enum ArtifactDetailsTab { OVERVIEW = 0, @@ -185,10 +185,7 @@ class ArtifactDetails extends Page<{}, ArtifactDetailsState> { const typeResponse = await this.api.metadataStoreService.getArtifactTypesByID(typeRequest); const artifactType = typeResponse.getArtifactTypesList()[0] || undefined; - const artifactName = - getResourceProperty(artifact, ArtifactProperties.NAME) || - getResourceProperty(artifact, ArtifactCustomProperties.NAME, true); - let title = artifactName ? artifactName.toString() : ''; + let title = ArtifactHelpers.getName(artifact); const version = getResourceProperty(artifact, ArtifactProperties.VERSION); if (version) { title += ` (version: ${version})`; diff --git a/frontend/src/pages/ExecutionDetails.tsx b/frontend/src/pages/ExecutionDetails.tsx index fe8ec0cb645..69dda9d94b4 100644 --- a/frontend/src/pages/ExecutionDetails.tsx +++ b/frontend/src/pages/ExecutionDetails.tsx @@ -18,13 +18,7 @@ import { CircularProgress } from '@material-ui/core'; import React, { Component } from 'react'; import { Link } from 'react-router-dom'; import { ExecutionHelpers, getArtifactName, getLinkedArtifactsByEvents } from 'src/mlmd/MlmdUtils'; -import { - Api, - ExecutionCustomProperties, - ExecutionProperties, - getArtifactTypes, - getResourceProperty, -} from 'src/mlmd/library'; +import { Api, getArtifactTypes } from 'src/mlmd/library'; import { ArtifactType, Event, From 0d8ce422b6817a4ab13daefb573c628f7d8e023a Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 08:32:39 +0000 Subject: [PATCH 13/17] fix samples --- samples/test/metrics_visualization_v2_test.py | 6 ++---- samples/test/util.py | 11 ++++++++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/samples/test/metrics_visualization_v2_test.py b/samples/test/metrics_visualization_v2_test.py index 09a4a3b0493..55b928c00b5 100644 --- a/samples/test/metrics_visualization_v2_test.py +++ b/samples/test/metrics_visualization_v2_test.py @@ -154,7 +154,7 @@ def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): }, iris_sgdclassifier.get_dict() ) rows = iris_sgdclassifier.get_dict()['outputs']['artifacts'][0]['metadata'][ - 'confusionMatrix']['structValue']['struct']['rows'] + 'confusionMatrix']['struct']['rows'] for i, row in enumerate(rows): for j, item in enumerate(row['row']): t.assertIsInstance( @@ -172,9 +172,7 @@ def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): 'artifacts': [{ 'metadata': { 'display_name': 'metrics', - 'accuracy': { - 'doubleValue': 92.0 - }, + 'accuracy': 92.0, }, 'name': 'metrics', 'type': 'system.Metrics' diff --git a/samples/test/util.py b/samples/test/util.py index 3b477209c76..6dd9c002d9a 100644 --- a/samples/test/util.py +++ b/samples/test/util.py @@ -226,9 +226,12 @@ def _create_run(): def simplify_proto_struct(data: dict) -> dict: res = {} for key, value in data.items(): - if value.get('stringValue'): + print(value.__dir__) + if value.get('stringValue') is not None: res[key] = value['stringValue'] - elif value.get('structValue'): + if value.get('doubleValue') is not None: + res[key] = value['doubleValue'] + elif value.get('structValue') is not None: res[key] = value['structValue'] else: res[key] = value @@ -254,7 +257,9 @@ def new( artifact_name = mlmd_event.path.steps[0].key # The original field is custom_properties, but MessageToDict converts it # to customProperties. - metadata = simplify_proto_struct(MessageToDict(mlmd_artifact).get('customProperties', {})) + metadata = simplify_proto_struct( + MessageToDict(mlmd_artifact).get('customProperties', {}) + ) return cls( name=artifact_name, type=mlmd_artifact_type.name, From 1b520c50ef12446c2cfcb0a33129c2bdfe35b3a2 Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 08:32:47 +0000 Subject: [PATCH 14/17] fix frontend tests --- frontend/src/mlmd/MlmdUtils.test.ts | 26 +++++++++++-------- .../LineageActionBar.test.tsx.snap | 2 +- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/frontend/src/mlmd/MlmdUtils.test.ts b/frontend/src/mlmd/MlmdUtils.test.ts index 139547407cb..ce44f0518cc 100644 --- a/frontend/src/mlmd/MlmdUtils.test.ts +++ b/frontend/src/mlmd/MlmdUtils.test.ts @@ -30,6 +30,7 @@ import { Workflow, WorkflowSpec, WorkflowStatus } from 'third_party/argo-ui/argo testBestPractices(); const WORKFLOW_NAME = 'run-st448'; +const RUN_ID = 'abcdefghijk'; const WORKFLOW_EMPTY: Workflow = { metadata: { name: WORKFLOW_NAME, @@ -40,8 +41,8 @@ const WORKFLOW_EMPTY: Workflow = { }; const V2_CONTEXT = new Context(); -V2_CONTEXT.setName(WORKFLOW_NAME); -V2_CONTEXT.setType('kfp.PipelineRun'); +V2_CONTEXT.setName(RUN_ID); +V2_CONTEXT.setType('system.PipelineRun'); const TFX_CONTEXT = new Context(); TFX_CONTEXT.setName('run.run-st448'); @@ -55,26 +56,29 @@ describe('MlmdUtils', () => { describe('getRunContext', () => { it('gets KFP v2 context', async () => { mockGetContextByTypeAndName([V2_CONTEXT]); - const context = await getRunContext({ - ...WORKFLOW_EMPTY, - metadata: { - ...WORKFLOW_EMPTY.metadata, - annotations: { 'pipelines.kubeflow.org/v2_pipeline': 'true' }, + const context = await getRunContext( + { + ...WORKFLOW_EMPTY, + metadata: { + ...WORKFLOW_EMPTY.metadata, + annotations: { 'pipelines.kubeflow.org/v2_pipeline': 'true' }, + }, }, - }); + RUN_ID, + ); expect(context).toEqual(V2_CONTEXT); }); it('gets TFX context', async () => { mockGetContextByTypeAndName([TFX_CONTEXT, V1_CONTEXT]); - const context = await getRunContext(WORKFLOW_EMPTY); + const context = await getRunContext(WORKFLOW_EMPTY, RUN_ID); expect(context).toEqual(TFX_CONTEXT); }); it('gets KFP v1 context', async () => { const verify = expectWarnings(); mockGetContextByTypeAndName([V1_CONTEXT]); - const context = await getRunContext(WORKFLOW_EMPTY); + const context = await getRunContext(WORKFLOW_EMPTY, RUN_ID); expect(context).toEqual(V1_CONTEXT); verify(); }); @@ -82,7 +86,7 @@ describe('MlmdUtils', () => { it('throws error when not found', async () => { const verify = expectWarnings(); mockGetContextByTypeAndName([]); - await expect(getRunContext(WORKFLOW_EMPTY)).rejects.toThrow(); + await expect(getRunContext(WORKFLOW_EMPTY, RUN_ID)).rejects.toThrow(); verify(); }); }); diff --git a/frontend/src/mlmd/__snapshots__/LineageActionBar.test.tsx.snap b/frontend/src/mlmd/__snapshots__/LineageActionBar.test.tsx.snap index bbb626db355..83ae4e4af82 100644 --- a/frontend/src/mlmd/__snapshots__/LineageActionBar.test.tsx.snap +++ b/frontend/src/mlmd/__snapshots__/LineageActionBar.test.tsx.snap @@ -261,7 +261,7 @@ exports[`LineageActionBar Adds the artifact to the history state and DOM when pu key="breadcrumb-1" onClick={[Function]} > - (unnamed) + (No name)
From 3840df3c90af4b1244bfc9ea8236c12cc4a3bc9c Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 09:05:04 +0000 Subject: [PATCH 15/17] fix sample test --- samples/test/util.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/samples/test/util.py b/samples/test/util.py index 6dd9c002d9a..c89426b26a9 100644 --- a/samples/test/util.py +++ b/samples/test/util.py @@ -226,10 +226,9 @@ def _create_run(): def simplify_proto_struct(data: dict) -> dict: res = {} for key, value in data.items(): - print(value.__dir__) if value.get('stringValue') is not None: res[key] = value['stringValue'] - if value.get('doubleValue') is not None: + elif value.get('doubleValue') is not None: res[key] = value['doubleValue'] elif value.get('structValue') is not None: res[key] = value['structValue'] From 88b4259cf083b43edae5d6283659c0939f394d41 Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Fri, 16 Jul 2021 10:38:09 +0000 Subject: [PATCH 16/17] fix last sample --- samples/test/lightweight_python_functions_v2_pipeline_test.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/samples/test/lightweight_python_functions_v2_pipeline_test.py b/samples/test/lightweight_python_functions_v2_pipeline_test.py index d2ce9e29a2c..66df23de894 100644 --- a/samples/test/lightweight_python_functions_v2_pipeline_test.py +++ b/samples/test/lightweight_python_functions_v2_pipeline_test.py @@ -101,9 +101,7 @@ def verify(run: kfp_server_api.ApiRun, mlmd_connection_config, **kwargs): 'artifacts': [{ 'metadata': { 'display_name': 'model', - 'accuracy': { - 'doubleValue': 0.9, - } + 'accuracy': 0.9, }, 'name': 'model', 'type': 'system.Model' From 50e5c74f216f8698f072d4745a4774a91bdecaf1 Mon Sep 17 00:00:00 2001 From: Yuan Gong Date: Tue, 20 Jul 2021 04:43:28 +0000 Subject: [PATCH 17/17] address feedback --- v2/metadata/client_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/v2/metadata/client_test.go b/v2/metadata/client_test.go index 7a37ec56a85..c6a3338a4b5 100644 --- a/v2/metadata/client_test.go +++ b/v2/metadata/client_test.go @@ -35,6 +35,8 @@ import ( const ( testMlmdServerAddress = "localhost" testMlmdServerPort = "8080" + namespace = "kubeflow" + runResource = "workflows.argoproj.io/hello-world-abcd" ) func Test_schemaToArtifactType(t *testing.T) { @@ -84,7 +86,7 @@ func Test_GetPipeline(t *testing.T) { mlmdClient, err := NewTestMlmdClient() fatalIf(err) - _, err = client.GetPipeline(ctx, "get-pipeline-test", runId, "kubeflow", "workflows.argoproj.io/hello-world-abcd") + _, err = client.GetPipeline(ctx, "get-pipeline-test", runId, namespace, runResource) fatalIf(err) runCtxType := "system.PipelineRun" pipeline := "get-pipeline-test" @@ -130,7 +132,7 @@ func Test_GetPipelineConcurrently(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, "kubeflow", "workflows.argoproj.io/hello-world-"+runIdText) + _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, namespace, "workflows.argoproj.io/hello-world-"+runIdText) if err != nil { t.Error(err) } @@ -142,7 +144,7 @@ func Test_GetPipelineConcurrently(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, "kubeflow", "workflows.argoproj.io/hello-world-"+runIdText) + _, err := client.GetPipeline(ctx, fmt.Sprintf("get-pipeline-concurrently-test-%s", runIdText), runIdText, namespace, "workflows.argoproj.io/hello-world-"+runIdText) if err != nil { t.Error(err) }