Skip to content

Commit

Permalink
utilize session info in launcher & importer
Browse files Browse the repository at this point in the history
retrieves the session info (if provided via kfp-launcher) and utilizes it for opening the provider's associated bucket

Signed-off-by: Humair Khan <HumairAK@users.noreply.github.com>
  • Loading branch information
HumairAK committed Apr 3, 2024
1 parent c9fb27f commit df6c24c
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 9 deletions.
2 changes: 1 addition & 1 deletion backend/src/v2/component/importer_launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (l *ImportLauncher) Execute(ctx context.Context) (err error) {
}()
// TODO(Bobgy): there's no need to pass any parameters, because pipeline
// and pipeline run context have been created by root DAG driver.
pipeline, err := l.metadataClient.GetPipeline(ctx, l.importerLauncherOptions.PipelineName, l.importerLauncherOptions.RunID, "", "", "")
pipeline, err := l.metadataClient.GetPipeline(ctx, l.importerLauncherOptions.PipelineName, l.importerLauncherOptions.RunID, "", "", "", "")
if err != nil {
return err
}
Expand Down
27 changes: 20 additions & 7 deletions backend/src/v2/component/launcher_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,12 @@ func (l *LauncherV2) Execute(ctx context.Context) (err error) {
return err
}
fingerPrint := execution.FingerPrint()
bucketConfig, err := objectstore.ParseBucketConfig(execution.GetPipeline().GetPipelineRoot())
bucketSessionInfo, err := objectstore.GetSessionInfoFromString(execution.GetPipeline().GetPipelineBucketSession())
if err != nil {
return err
}
pipelineRoot := execution.GetPipeline().GetPipelineRoot()
bucketConfig, err := objectstore.ParseBucketConfig(pipelineRoot, bucketSessionInfo)
if err != nil {
return err
}
Expand Down Expand Up @@ -534,14 +539,22 @@ func fetchNonDefaultBuckets(
}
// TODO: Support multiple artifacts someday, probably through the v2 engine.
artifact := artifactList.Artifacts[0]
// The artifact does not belong under the object store path for this run. Cases:
// 1. Artifact is cached from a different run, so it may still be in the default bucket, but under a different run id subpath
// 2. Artifact is imported from the same bucket, but from a different path (re-use the same session)
// 3. Artifact is imported from a different bucket, or obj store (default to using user env in this case)
if !strings.HasPrefix(artifact.Uri, defaultBucketConfig.PrefixedBucket()) {
nonDefaultBucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(artifact.Uri)
if err != nil {
return nonDefaultBuckets, fmt.Errorf("failed to parse bucketConfig for output artifact %q with uri %q: %w", name, artifact.GetUri(), err)
nonDefaultBucketConfig, parseErr := objectstore.ParseBucketConfigForArtifactURI(artifact.Uri)
if parseErr != nil {
return nonDefaultBuckets, fmt.Errorf("failed to parse bucketConfig for output artifact %q with uri %q: %w", name, artifact.GetUri(), parseErr)
}
nonDefaultBucket, err := objectstore.OpenBucket(ctx, k8sClient, namespace, nonDefaultBucketConfig)
if err != nil {
return nonDefaultBuckets, fmt.Errorf("failed to open bucket for output artifact %q with uri %q: %w", name, artifact.GetUri(), err)
// check if it's same bucket but under a different path, re-use the default bucket session in this case.
if (nonDefaultBucketConfig.Scheme == defaultBucketConfig.Scheme) && (nonDefaultBucketConfig.BucketName == defaultBucketConfig.BucketName) {
nonDefaultBucketConfig.Session = defaultBucketConfig.Session
}
nonDefaultBucket, bucketErr := objectstore.OpenBucket(ctx, k8sClient, namespace, nonDefaultBucketConfig)
if bucketErr != nil {
return nonDefaultBuckets, fmt.Errorf("failed to open bucket for output artifact %q with uri %q: %w", name, artifact.GetUri(), bucketErr)
}
nonDefaultBuckets[nonDefaultBucketConfig.PrefixedBucket()] = nonDefaultBucket
}
Expand Down
2 changes: 1 addition & 1 deletion backend/src/v2/component/launcher_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func Test_executeV2_Parameters(t *testing.T) {
fakeMetadataClient := metadata.NewFakeClient()
bucket, err := blob.OpenBucket(context.Background(), "gs://test-bucket")
assert.Nil(t, err)
bucketConfig, err := objectstore.ParseBucketConfig("gs://test-bucket/pipeline-root/")
bucketConfig, err := objectstore.ParseBucketConfig("gs://test-bucket/pipeline-root/", nil)
assert.Nil(t, err)
_, _, err = executeV2(context.Background(), test.executorInput, addNumbersComponent, "sh", test.executorArgs, bucket, bucketConfig, fakeMetadataClient, "namespace", fakeKubernetesClientset)

Expand Down

0 comments on commit df6c24c

Please sign in to comment.