Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend) V1 Pipelines add maximum_cache_staleness and default_cache_staleness #8270

Merged
merged 12 commits into from
Oct 4, 2022
37 changes: 33 additions & 4 deletions backend/src/cache/server/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,43 @@ func MutatePodIfCached(req *v1beta1.AdmissionRequest, clientMgr ClientManagerInt

annotations[ExecutionKey] = executionHashKey
labels[CacheIDLabelKey] = ""
var maxCacheStalenessInSeconds int64 = -1
maxCacheStaleness, exists := annotations[MaxCacheStalenessKey]

var cacheStalenessInSeconds int64 = -1
var userCacheStalenessInSeconds int64 = -1
var defaultCacheStalenessInSeconds int64 = -1
var maximumCacheStalenessInSeconds int64 = -1

userCacheStaleness, exists := annotations[MaxCacheStalenessKey]
if exists {
maxCacheStalenessInSeconds = getMaxCacheStaleness(maxCacheStaleness)
log.Printf("userCacheStaleness: %s", userCacheStaleness)
userCacheStalenessInSeconds = stalenessToSeconds(userCacheStaleness)
log.Printf("userCacheStalenessInSeconds: %d", userCacheStalenessInSeconds)
juliusvonkohout marked this conversation as resolved.
Show resolved Hide resolved
}
defaultCacheStaleness, exists := os.LookupEnv("DEFAULT_CACHE_STALENESS")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If userCacheStaleness exists, we shouldn't even care about defaultCacheStaleness, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly that is happening. I just read the two environment variables and the Workflow annotation.
Then i calculate from all three values the staleness to use. So if a user/annotation value is set and less than MAXIMUM_CACHE_STALENESS, the user/annotation value is used instead of DEFAULT_CACHE_STALENESS.

	var cacheStalenessInSeconds int64 = -1
	var userCacheStalenessInSeconds int64 = -1
	var defaultCacheStalenessInSeconds int64 = -1
	var maximumCacheStalenessInSeconds int64 = -1

	userCacheStaleness, exists := annotations[MaxCacheStalenessKey]
	if exists {
		userCacheStalenessInSeconds = stalenessToSeconds(userCacheStaleness)
	}
	defaultCacheStaleness, exists := os.LookupEnv("DEFAULT_CACHE_STALENESS")
	if exists {
		defaultCacheStalenessInSeconds = stalenessToSeconds(defaultCacheStaleness)
	}
	maximumCacheStaleness, exists := os.LookupEnv("MAXIMUM_CACHE_STALENESS")
	if exists {
		maximumCacheStalenessInSeconds = stalenessToSeconds(maximumCacheStaleness)
	}

	if userCacheStalenessInSeconds < 0 {
		cacheStalenessInSeconds = defaultCacheStalenessInSeconds
	} else {
		cacheStalenessInSeconds = userCacheStalenessInSeconds
	}
	if cacheStalenessInSeconds > maximumCacheStalenessInSeconds {
		cacheStalenessInSeconds = maximumCacheStalenessInSeconds
	}
	log.Printf("cacheStalenessInSeconds: %d", cacheStalenessInSeconds)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I was thinking that something like this would be cleaner and free from waste operation. I don't have strong opinion though.

var cacheStalenessInSeconds int64 = -1

cacheStaleness, exists := annotations[MaxCacheStalenessKey]
if !exists {
    cacheStaleness, exists = os.LookupEnv("DEFAULT_CACHE_STALENESS")
} 
if exists {
    cacheStalenessInSeconds = stalenessToSeconds(cacheStaleness)
}
...
if cacheStalenessInSeconds > maximumCacheStalenessInSeconds {
    cacheStalenessInSeconds = maximumCacheStalenessInSeconds
}

if exists {
log.Printf("defaultCacheStaleness: %s", defaultCacheStaleness)
defaultCacheStalenessInSeconds = stalenessToSeconds(defaultCacheStaleness)
log.Printf("defaultCacheStalenessInSeconds: %d", defaultCacheStalenessInSeconds)
juliusvonkohout marked this conversation as resolved.
Show resolved Hide resolved
}
maximumCacheStaleness, exists := os.LookupEnv("MAXIMUM_CACHE_STALENESS")
if exists {
log.Printf("maximumCacheStaleness: %s", maximumCacheStaleness)
maximumCacheStalenessInSeconds = stalenessToSeconds(maximumCacheStaleness)
log.Printf("maximumCacheStalenessInSeconds: %d", maximumCacheStalenessInSeconds)
}

if userCacheStalenessInSeconds < 0 {
cacheStalenessInSeconds = defaultCacheStalenessInSeconds
} else {
cacheStalenessInSeconds = userCacheStalenessInSeconds
}
if cacheStalenessInSeconds > maximumCacheStalenessInSeconds {
cacheStalenessInSeconds = maximumCacheStalenessInSeconds
}
log.Printf("cacheStalenessInSeconds: %d", cacheStalenessInSeconds)

var cachedExecution *model.ExecutionCache
cachedExecution, err = clientMgr.CacheStore().GetExecutionCache(executionHashKey, maxCacheStalenessInSeconds)
cachedExecution, err = clientMgr.CacheStore().GetExecutionCache(executionHashKey, cacheStalenessInSeconds, maximumCacheStalenessInSeconds)
if err != nil {
log.Println(err.Error())
}
Expand Down
25 changes: 19 additions & 6 deletions backend/src/cache/server/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"log"
"os"
"reflect"
"strconv"
"time"
Expand Down Expand Up @@ -67,19 +68,31 @@ func WatchPods(ctx context.Context, namespaceToWatch string, clientManager Clien
executionOutputMap[MetadataExecutionIDKey] = pod.ObjectMeta.Labels[MetadataExecutionIDKey]
executionOutputJSON, _ := json.Marshal(executionOutputMap)

executionMaxCacheStaleness, exists := pod.ObjectMeta.Annotations[MaxCacheStalenessKey]
var maxCacheStalenessInSeconds int64 = -1
executionstaleness, exists := pod.ObjectMeta.Annotations[MaxCacheStalenessKey]
var cacheStalenessInSeconds int64 = -1
if exists {
maxCacheStalenessInSeconds = getMaxCacheStaleness(executionMaxCacheStaleness)
cacheStalenessInSeconds = stalenessToSeconds(executionstaleness)
}

var maximumCacheStalenessInSeconds int64 = -1
maximumCacheStaleness, exists := os.LookupEnv("MAXIMUM_CACHE_STALENESS")
if exists {
log.Printf("maximumCacheStaleness: %s", maximumCacheStaleness)
maximumCacheStalenessInSeconds = stalenessToSeconds(maximumCacheStaleness)
log.Printf("maximumCacheStalenessInSeconds: %d", maximumCacheStalenessInSeconds)
}
if cacheStalenessInSeconds > maximumCacheStalenessInSeconds || cacheStalenessInSeconds < 0 {
cacheStalenessInSeconds = maximumCacheStalenessInSeconds
}
log.Printf("Creating cachedb entry with cacheStalenessInSeconds: %d", cacheStalenessInSeconds)

executionTemplate, _ := getArgoTemplate(pod)

executionToPersist := model.ExecutionCache{
ExecutionCacheKey: executionKey,
ExecutionTemplate: executionTemplate,
ExecutionOutput: string(executionOutputJSON),
MaxCacheStaleness: maxCacheStalenessInSeconds,
MaxCacheStaleness: cacheStalenessInSeconds,
}

cacheEntryCreated, err := clientManager.CacheStore().CreateExecutionCache(&executionToPersist)
Expand Down Expand Up @@ -127,9 +140,9 @@ func patchCacheID(ctx context.Context, k8sCore client.KubernetesCoreInterface, p
}

// Convert RFC3339 Duration(Eg. "P1DT30H4S") to int64 seconds.
func getMaxCacheStaleness(maxCacheStaleness string) int64 {
func stalenessToSeconds(staleness string) int64 {
var seconds int64 = -1
if d, err := duration.Parse(maxCacheStaleness); err == nil {
if d, err := duration.Parse(staleness); err == nil {
seconds = int64(d / time.Second)
}
return seconds
Expand Down
37 changes: 29 additions & 8 deletions backend/src/cache/storage/execution_cache_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

type ExecutionCacheStoreInterface interface {
GetExecutionCache(executionCacheKey string, maxCacheStaleness int64) (*model.ExecutionCache, error)
GetExecutionCache(executionCacheKey string, cacheStaleness int64, maximumCacheStaleness int64) (*model.ExecutionCache, error)
CreateExecutionCache(*model.ExecutionCache) (*model.ExecutionCache, error)
DeleteExecutionCache(executionCacheKey string) error
}
Expand All @@ -35,16 +35,21 @@ type ExecutionCacheStore struct {
time util.TimeInterface
}

func (s *ExecutionCacheStore) GetExecutionCache(executionCacheKey string, maxCacheStaleness int64) (*model.ExecutionCache, error) {
if maxCacheStaleness == 0 {
return nil, fmt.Errorf("MaxCacheStaleness=0, Cache is disabled.")
func (s *ExecutionCacheStore) GetExecutionCache(executionCacheKey string, cacheStaleness int64, maximumCacheStaleness int64) (*model.ExecutionCache, error) {
rowsAffected, err := s.cleanDatabase(maximumCacheStaleness)
log.Printf("Number of deleted rows: %d", rowsAffected)
if err != nil {
return nil, fmt.Errorf("Failed to cleanup old cache entries: %s", err)
}
if cacheStaleness == 0 {
return nil, fmt.Errorf("CacheStaleness=0, Cache is disabled.")
}
r, err := s.db.Table("execution_caches").Where("ExecutionCacheKey = ?", executionCacheKey).Rows()
if err != nil {
return nil, fmt.Errorf("Failed to get execution cache: %q", executionCacheKey)
}
defer r.Close()
executionCaches, err := s.scanRows(r, maxCacheStaleness)
executionCaches, err := s.scanRows(r, cacheStaleness)
if err != nil {
return nil, fmt.Errorf("Failed to get execution cache: %q", executionCacheKey)
}
Expand All @@ -58,7 +63,21 @@ func (s *ExecutionCacheStore) GetExecutionCache(executionCacheKey string, maxCac
return latestCache, nil
}

func (s *ExecutionCacheStore) scanRows(rows *sql.Rows, podMaxCacheStaleness int64) ([]*model.ExecutionCache, error) {
func (s *ExecutionCacheStore) cleanDatabase(maximumCacheStaleness int64) (int64, error) {
// Expire any entry for any pipeline that is
// older than os.LookupEnv("MAXIMUM_CACHE_STALENESS")
if maximumCacheStaleness < 0 {
return 0, nil
}
log.Printf("Cleaning cache entries older than maximumCacheStaleness=%d", maximumCacheStaleness)
db := s.db.Exec(
"DELETE FROM execution_caches WHERE " +
strconv.FormatInt(int64(s.time.Now().UTC().Unix()), 10) + " - StartedAtInSec" +
" > " + strconv.FormatInt(int64(maximumCacheStaleness), 10) + ";")
return db.RowsAffected, db.Error
}

func (s *ExecutionCacheStore) scanRows(rows *sql.Rows, podCacheStaleness int64) ([]*model.ExecutionCache, error) {
var executionCaches []*model.ExecutionCache
for rows.Next() {
var executionCacheKey, executionTemplate, executionOutput string
Expand All @@ -76,7 +95,10 @@ func (s *ExecutionCacheStore) scanRows(rows *sql.Rows, podMaxCacheStaleness int6
}
log.Println("Get id: " + strconv.FormatInt(id, 10))
log.Println("Get template: " + executionTemplate)
if maxCacheStaleness == -1 || s.time.Now().UTC().Unix()-startedAtInSec <= podMaxCacheStaleness {
// maxCacheStaleness comes from the database entry.
// podCacheStaleness is computed from the pods annotation and environment variables.
if (maxCacheStaleness < 0 || s.time.Now().UTC().Unix()-startedAtInSec <= maxCacheStaleness) &&
(podCacheStaleness < 0 || s.time.Now().UTC().Unix()-startedAtInSec <= podCacheStaleness) {
executionCaches = append(executionCaches, &model.ExecutionCache{
ID: id,
ExecutionCacheKey: executionCacheKey,
Expand All @@ -87,7 +109,6 @@ func (s *ExecutionCacheStore) scanRows(rows *sql.Rows, podMaxCacheStaleness int6
EndedAtInSec: endedAtInSec,
})
}

}
return executionCaches, nil
}
Expand Down
55 changes: 48 additions & 7 deletions backend/src/cache/storage/execution_cache_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
"log"
"testing"

"github.com/kubeflow/pipelines/backend/src/cache/model"
Expand Down Expand Up @@ -94,7 +95,7 @@ func TestGetExecutionCache(t *testing.T) {
}

var executionCache *model.ExecutionCache
executionCache, err := executionCacheStore.GetExecutionCache("testKey", -1)
executionCache, err := executionCacheStore.GetExecutionCache("testKey", -1, -1)
require.Nil(t, err)
require.Equal(t, &executionCacheExpected, executionCache)
}
Expand All @@ -106,7 +107,7 @@ func TestGetExecutionCacheWithEmptyCacheEntry(t *testing.T) {

executionCacheStore.CreateExecutionCache(createExecutionCache("testKey", "testOutput"))
var executionCache *model.ExecutionCache
executionCache, err := executionCacheStore.GetExecutionCache("wrongKey", -1)
executionCache, err := executionCacheStore.GetExecutionCache("wrongKey", -1, -1)
require.Nil(t, executionCache)
require.Contains(t, err.Error(), `Execution cache not found with cache key: "wrongKey"`)
}
Expand All @@ -129,12 +130,12 @@ func TestGetExecutionCacheWithLatestCacheEntry(t *testing.T) {
EndedAtInSec: 2,
}
var executionCache *model.ExecutionCache
executionCache, err := executionCacheStore.GetExecutionCache("testKey", -1)
executionCache, err := executionCacheStore.GetExecutionCache("testKey", -1, -1)
require.Nil(t, err)
require.Equal(t, &executionCacheExpected, executionCache)
}

func TestGetExecutionCacheWithExpiredMaxCacheStaleness(t *testing.T) {
func TestGetExecutionCacheWithExpiredDatabaseCacheStaleness(t *testing.T) {
db := NewFakeDbOrFatal()
defer db.Close()
executionCacheStore := NewExecutionCacheStore(db, util.NewFakeTimeForEpoch())
Expand All @@ -147,7 +148,47 @@ func TestGetExecutionCacheWithExpiredMaxCacheStaleness(t *testing.T) {
executionCacheStore.CreateExecutionCache(executionCacheToPersist)

var executionCache *model.ExecutionCache
executionCache, err := executionCacheStore.GetExecutionCache("testKey", -1)
executionCache, err := executionCacheStore.GetExecutionCache("testKey", -1, -1)
require.Contains(t, err.Error(), "Execution cache not found")
require.Nil(t, executionCache)
}

func TestGetExecutionCacheWithExpiredAnnotationCacheStaleness(t *testing.T) {
db := NewFakeDbOrFatal()
defer db.Close()
executionCacheStore := NewExecutionCacheStore(db, util.NewFakeTimeForEpoch())
executionCacheToPersist := &model.ExecutionCache{
ExecutionCacheKey: "testKey",
ExecutionTemplate: "testTemplate",
ExecutionOutput: "testOutput",
MaxCacheStaleness: -1,
}
executionCacheStore.CreateExecutionCache(executionCacheToPersist)

var executionCache *model.ExecutionCache
executionCache, err := executionCacheStore.GetExecutionCache("testKey", 0, -1)
log.Println(executionCache)
log.Println("error: " + err.Error())
require.Contains(t, err.Error(), "CacheStaleness=0, Cache is disabled.")
require.Nil(t, executionCache)
}

func TestGetExecutionCacheWithExpiredMaximumCacheStaleness(t *testing.T) {
db := NewFakeDbOrFatal()
defer db.Close()
executionCacheStore := NewExecutionCacheStore(db, util.NewFakeTimeForEpoch())
executionCacheToPersist := &model.ExecutionCache{
ExecutionCacheKey: "testKey",
ExecutionTemplate: "testTemplate",
ExecutionOutput: "testOutput",
MaxCacheStaleness: -1,
}
executionCacheStore.CreateExecutionCache(executionCacheToPersist)

var executionCache *model.ExecutionCache
executionCache, err := executionCacheStore.GetExecutionCache("testKey", -1, 0)
log.Println(executionCache)
log.Println("error: " + err.Error())
require.Contains(t, err.Error(), "Execution cache not found")
require.Nil(t, executionCache)
}
Expand All @@ -157,13 +198,13 @@ func TestDeleteExecutionCache(t *testing.T) {
defer db.Close()
executionCacheStore := NewExecutionCacheStore(db, util.NewFakeTimeForEpoch())
executionCacheStore.CreateExecutionCache(createExecutionCache("testKey", "testOutput"))
executionCache, err := executionCacheStore.GetExecutionCache("testKey", -1)
executionCache, err := executionCacheStore.GetExecutionCache("testKey", -1, -1)
assert.Nil(t, err)
assert.NotNil(t, executionCache)

err = executionCacheStore.DeleteExecutionCache("1")
assert.Nil(t, err)
_, err = executionCacheStore.GetExecutionCache("testKey", -1)
_, err = executionCacheStore.GetExecutionCache("testKey", -1, -1)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "not found")
}
10 changes: 10 additions & 0 deletions manifests/kustomize/base/cache/cache-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ spec:
- name: server
image: gcr.io/ml-pipeline/cache-server:dummy
env:
- name: DEFAULT_CACHE_STALENESS
valueFrom:
configMapKeyRef:
name: pipeline-install-config
key: DEFAULT_CACHE_STALENESS
- name: MAXIMUM_CACHE_STALENESS
valueFrom:
configMapKeyRef:
name: pipeline-install-config
key: MAXIMUM_CACHE_STALENESS
- name: CACHE_IMAGE
valueFrom:
configMapKeyRef:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ data:
## any node and avoid defaulting to specific nodes. Allowed values are:
## 'false' and 'true'.
cacheNodeRestrictions: "false"
## MAXIMUM_CACHE_STALENESS configures caching according to
## https://www.kubeflow.org/docs/components/pipelines/overview/caching/ and
## https://www.kubeflow.org/docs/components/pipelines/overview/caching-v2/.
## Larger than MAXIMUM_CACHE_STALENESS per pipeline user set values are
## reduced to MAXIMUM_CACHE_STALENESS.
## The administrator of the storage backend can rely on it to delete old cache
## artifacts.
MAXIMUM_CACHE_STALENESS: ""
## MAXIMUM_CACHE_STALENESS: "P30D"
## DEFAULT_CACHE_STALENESS configures caching according to
## https://www.kubeflow.org/docs/components/pipelines/overview/caching/ and
juliusvonkohout marked this conversation as resolved.
Show resolved Hide resolved
## https://www.kubeflow.org/docs/components/pipelines/overview/caching-v2/.
## This value is used if the user did not set a value in the pipeline.
DEFAULT_CACHE_STALENESS: ""
## DEFAULT_CACHE_STALENESS: "P7D"
## ConMaxLifeTime will set the connection max lifetime for MySQL
## this is very important to setup when using external databases.
## See this issue for more details: https://github.com/kubeflow/pipelines/issues/5329
Expand Down