Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Datacatalog cache deletion #4655

Open
wants to merge 69 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
d5fe607
Migrated CatalogClient from propeller to stdlib
Dec 15, 2022
422eb46
Added function for updating selected fields of NodeExecutionModel
Dec 15, 2022
47e9a00
Refactored listing of node and task executions to shared util
Dec 15, 2022
25c187c
Implemented CacheService
Dec 15, 2022
dc59334
Updated to latest unreleased versions of flytepropeller and flytestdlib
Dec 15, 2022
4c041c1
Added function for deleting artifacts to catalog client interface
Nov 8, 2022
56c7b31
Added method to release catalog reservation by artifact tag
Dec 15, 2022
557a991
Update to latest unreleased versions of flyteidl and flyteplugins
Jan 4, 2023
821467e
Updated to latest unreleased versions of flyteidl, flyteplugins, flyt…
Jan 4, 2023
4dd0afd
Added datacatalog endpoint for deleting artifacts
Oct 7, 2022
7df2814
WIP: first draft for cache eviction of past executions
Nov 8, 2022
6480f02
Updated to latest released version of flytestdlib
Nov 8, 2022
1dc2115
Refactored cache eviction to own service
Nov 10, 2022
419570e
Refactored new cache service to share EvictCacheRequest for both endp…
Nov 11, 2022
ea6df4e
Merged cache eviction endpoints into single RPC call
Nov 11, 2022
da32ef0
Removed no longer relevant generated mocks
Nov 11, 2022
80e8e72
Split CacheService.EvictCache into two separate endpoints
Dec 15, 2022
d7f5203
Add bulk endpoints for acquiring/releasing reservations and deleting …
Jan 19, 2023
d7eda41
Implemented deletion of artifact including artifact data
Dec 15, 2022
8890348
Deleting artifacts cleans up tags and partitions as well
Oct 10, 2022
a2315f8
Updated to latest unreleased version of flyteidl and flytestdlib
Dec 15, 2022
d2565a1
Updated to latest unreleased version of flytestdlib
Dec 15, 2022
0167db1
Updated to latest unreleased version fo flyteidl and flytestdlib
Jan 4, 2023
8f2f414
Implement acquiring/releasing of reserverations as bulk operation
Jan 19, 2023
c2d5454
Transfer commits
pvditt Dec 29, 2023
5cd55cf
resolve conflicts
pvditt Dec 30, 2023
d79a634
Merge remote-tracking branch 'blackshark-ai/flyteplugins/cache-evicti…
pvditt Jan 3, 2024
adf6209
resolve conflicts
pvditt Jan 3, 2024
23d2dc1
update import paths
pvditt Jan 4, 2024
cda7c34
implement new catalog client methods
pvditt Jan 4, 2024
5695f93
move catalog client to stdlib
pvditt Jan 4, 2024
590c379
tidy
pvditt Jan 4, 2024
143e160
Merge remote-tracking branch 'blackshark-ai/flytestdlib/delete-artifa…
pvditt Jan 5, 2024
fac2e8e
Merge remote-tracking branch 'blackshark-ai/flyteadmin/cache-eviction…
pvditt Jan 5, 2024
d48d9e8
lint
pvditt Jan 6, 2024
751447a
update proto equals check
pvditt Jan 7, 2024
147765d
remove workflow executuion cache eviction endpoint
pvditt Jan 7, 2024
7c93bb6
update idl service docs
pvditt Jan 7, 2024
662355c
Merge branch 'master' into feature/datacatalog-cache-deletion
pvditt Jan 8, 2024
114a45e
only support cache eviction for single task
pvditt Jan 8, 2024
2157d0d
delete dataset along with artifact
pvditt Jan 9, 2024
5fa9c8f
always attempt releasing reservation after it has been acquired
pvditt Jan 9, 2024
80823c5
move utility functions back to execution managers
pvditt Jan 9, 2024
968fba6
set db context on UpdateSelected
pvditt Jan 9, 2024
4d422a6
Revert "update idl service docs"
pvditt Jan 9, 2024
77b1b2c
manually remove EvictExecutionCache mentions from service.rst
pvditt Jan 9, 2024
2525232
update node execution closure after succeesful artifact deletion
pvditt Jan 9, 2024
5bfd6e9
don't remove catalog key from taskNodeMetadata to better support retries
pvditt Jan 9, 2024
d01a0e7
Merge branch 'master' into feature/datacatalog-cache-deletion
pvditt Jan 10, 2024
d7ac714
set node execution CacheStatus and taskNodeMetadata CacheStatus to ca…
pvditt Jan 10, 2024
409e221
Merge branch 'master' into feature/datacatalog-cache-deletion
pvditt Jan 26, 2024
6aeaae1
remove bulk endpoints
pvditt Jan 26, 2024
706dc4e
remove bulk delete counters
pvditt Jan 26, 2024
6ec0e88
revert mod tidy chagnes
pvditt Jan 31, 2024
3ce3742
replace flyteidl source in flytestdlib
pvditt Jan 31, 2024
209e2aa
add otelgrpc dependency
pvditt Jan 31, 2024
007be11
fix flytestdlib dependencies
pvditt Jan 31, 2024
13a2f68
revert release reservation changes to support release multiple reserv…
pvditt Jan 31, 2024
2f9c0b4
tidy
pvditt Jan 31, 2024
91d30e4
revert tidy
pvditt Jan 31, 2024
30eb267
tidy :facepalm:
pvditt Jan 31, 2024
a0d0aae
Merge branch 'master' into feature/datacatalog-cache-deletion
pvditt Jan 31, 2024
7e8a19e
tidy
pvditt Jan 31, 2024
cdbb950
merge master
pvditt Feb 22, 2024
6e80529
cleanup
pvditt Feb 22, 2024
d858906
cleanup
pvditt Feb 22, 2024
ae652a0
update boiler plate linter
pvditt Feb 23, 2024
83263a8
Merge branch 'master' into feature/datacatalog-cache-deletion
pvditt Feb 27, 2024
9e322a8
merge master
pvditt Mar 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datacatalog/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewCollectedErrors(code codes.Code, errors []error) error {
errorCollection[idx] = err.Error()
}

return NewDataCatalogError(code, strings.Join((errorCollection), ", "))
return NewDataCatalogError(code, strings.Join(errorCollection, ", "))
}

func IsAlreadyExistsError(err error) bool {
Expand Down
76 changes: 73 additions & 3 deletions datacatalog/pkg/manager/impl/artifact_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
updateDataFailureCounter labeled.Counter
deleteDataSuccessCounter labeled.Counter
deleteDataFailureCounter labeled.Counter
deleteResponseTime labeled.StopWatch
deleteSuccessCounter labeled.Counter
deleteFailureCounter labeled.Counter
}

type artifactManager struct {
Expand All @@ -53,7 +56,8 @@
systemMetrics artifactMetrics
}

// Create an Artifact along with the associated ArtifactData. The ArtifactData will be stored in an offloaded location.
// CreateArtifact creates an Artifact along with the associated ArtifactData. The ArtifactData will be stored in an
// offloaded location.
func (m *artifactManager) CreateArtifact(ctx context.Context, request *datacatalog.CreateArtifactRequest) (*datacatalog.CreateArtifactResponse, error) {
timer := m.systemMetrics.createResponseTime.Start(ctx)
defer timer.Stop()
Expand Down Expand Up @@ -129,7 +133,7 @@
return &datacatalog.CreateArtifactResponse{}, nil
}

// Get the Artifact and its associated ArtifactData. The request can query by ArtifactID or TagName.
// GetArtifact retrieves the Artifact and its associated ArtifactData. The request can query by ArtifactID or TagName.
func (m *artifactManager) GetArtifact(ctx context.Context, request *datacatalog.GetArtifactRequest) (*datacatalog.GetArtifactResponse, error) {
timer := m.systemMetrics.getResponseTime.Start(ctx)
defer timer.Stop()
Expand Down Expand Up @@ -240,6 +244,8 @@
return artifactDataList, nil
}

// ListArtifacts returns a paginated list of artifacts matching the provided filter expression, including their
// associated artifact data.
func (m *artifactManager) ListArtifacts(ctx context.Context, request *datacatalog.ListArtifactsRequest) (*datacatalog.ListArtifactsResponse, error) {
err := validators.ValidateListArtifactRequest(request)
if err != nil {
Expand Down Expand Up @@ -359,7 +365,7 @@

dataLocation, err := m.artifactStore.PutData(ctx, artifact, artifactData)
if err != nil {
logger.Errorf(ctx, "Failed to store artifact data during update, err: %v", err)
logger.Errorf(ctx, "Failed to store artifact data [%v] during update, err: %v", artifactData.Name, err)

Check warning on line 368 in datacatalog/pkg/manager/impl/artifact_manager.go

View check run for this annotation

Codecov / codecov/patch

datacatalog/pkg/manager/impl/artifact_manager.go#L368

Added line #L368 was not covered by tests
m.systemMetrics.updateDataFailureCounter.Inc(ctx)
m.systemMetrics.updateFailureCounter.Inc(ctx)
return nil, err
Expand Down Expand Up @@ -416,6 +422,67 @@
}, nil
}

func (m *artifactManager) deleteArtifact(ctx context.Context, datasetID *datacatalog.DatasetID, queryHandle artifactQueryHandle) error {
ctx = contextutils.WithProjectDomain(ctx, datasetID.Project, datasetID.Domain)

// artifact must already exist, verify first
artifactModel, err := m.findArtifact(ctx, datasetID, queryHandle)
if err != nil {
logger.Errorf(ctx, "Failed to get artifact while trying to delete [%v], err: %v", queryHandle, err)
return err
}

// delete all artifact data from the blob storage
hamersaw marked this conversation as resolved.
Show resolved Hide resolved
for _, artifactData := range artifactModel.ArtifactData {
if err := m.artifactStore.DeleteData(ctx, artifactData); err != nil {
logger.Errorf(ctx, "Failed to delete artifact data [%v] while deleting artifact [%v], err: %v", artifactData.Name, artifactModel.ArtifactID, err)
m.systemMetrics.deleteDataFailureCounter.Inc(ctx)
return err
}

Check warning on line 441 in datacatalog/pkg/manager/impl/artifact_manager.go

View check run for this annotation

Codecov / codecov/patch

datacatalog/pkg/manager/impl/artifact_manager.go#L438-L441

Added lines #L438 - L441 were not covered by tests

m.systemMetrics.deleteDataSuccessCounter.Inc(ctx)
}

// delete artifact from DB, also removed associated artifact data entries
err = m.repo.ArtifactRepo().Delete(ctx, artifactModel)
if err != nil {
if errors.IsDoesNotExistError(err) {
logger.Warnf(ctx, "Artifact [%v] does not exist, err %v", artifactModel.ArtifactID, err)
m.systemMetrics.doesNotExistCounter.Inc(ctx)
} else {
logger.Errorf(ctx, "Failed to delete artifact [%v], err: %v", artifactModel, err)
}
return err
}

logger.Debugf(ctx, "Successfully deleted artifact [%v]", artifactModel.ArtifactID)
return nil
}

// DeleteArtifact deletes the given artifact, removing all stored artifact data from the underlying blob storage.
func (m *artifactManager) DeleteArtifact(ctx context.Context, request *datacatalog.DeleteArtifactRequest) (*datacatalog.DeleteArtifactResponse, error) {
ctx = contextutils.WithProjectDomain(ctx, request.Dataset.Project, request.Dataset.Domain)

timer := m.systemMetrics.deleteResponseTime.Start(ctx)
defer timer.Stop()

err := validators.ValidateDeleteArtifactRequest(request)
if err != nil {
logger.Warningf(ctx, "Invalid delete artifacts request %v, err: %v", request, err)
m.systemMetrics.validationErrorCounter.Inc(ctx)
m.systemMetrics.deleteFailureCounter.Inc(ctx)
return nil, err
}

if err := m.deleteArtifact(ctx, request.GetDataset(), request); err != nil {
m.systemMetrics.deleteFailureCounter.Inc(ctx)
return nil, err
}

m.systemMetrics.deleteSuccessCounter.Inc(ctx)
return &datacatalog.DeleteArtifactResponse{}, nil
}

func NewArtifactManager(repo repositories.RepositoryInterface, store *storage.DataStore, storagePrefix storage.DataReference, artifactScope promutils.Scope) interfaces.ArtifactManager {
artifactMetrics := artifactMetrics{
scope: artifactScope,
Expand All @@ -440,6 +507,9 @@
updateDataFailureCounter: labeled.NewCounter("update_data_failure_count", "The number of times update artifact data failed", artifactScope, labeled.EmitUnlabeledMetric),
deleteDataSuccessCounter: labeled.NewCounter("delete_data_success_count", "The number of times delete artifact data succeeded", artifactScope, labeled.EmitUnlabeledMetric),
deleteDataFailureCounter: labeled.NewCounter("delete_data_failure_count", "The number of times delete artifact data failed", artifactScope, labeled.EmitUnlabeledMetric),
deleteResponseTime: labeled.NewStopWatch("delete_duration", "The duration of the delete artifact calls.", time.Millisecond, artifactScope, labeled.EmitUnlabeledMetric),
deleteSuccessCounter: labeled.NewCounter("delete_success_count", "The number of times delete artifact succeeded", artifactScope, labeled.EmitUnlabeledMetric),
deleteFailureCounter: labeled.NewCounter("delete_failure_count", "The number of times delete artifact failed", artifactScope, labeled.EmitUnlabeledMetric),
}

return &artifactManager{
Expand Down
Loading
Loading