Skip to content

Commit

Permalink
Merge branch 'main' into sync-chart-changes-10.2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
antgamdia authored Aug 22, 2022
2 parents 4538d71 + 42b4aae commit a644ff4
Show file tree
Hide file tree
Showing 24 changed files with 597 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type ChartCache struct {
// significant in that it flushes the whole redis cache and re-populates the state from k8s.
// When that happens we don't really want any concurrent access to the cache until the resync()
// operation is complete. In other words, we want to:
// - be able to have multiple concurrent readers (goroutines doing GetForOne())
// - be able to have multiple concurrent readers (goroutines doing Get())
// - only a single writer (goroutine doing a resync()) is allowed, and while its doing its job
// no readers are allowed
resyncCond *sync.Cond
Expand Down Expand Up @@ -424,11 +424,11 @@ func (c *ChartCache) syncHandler(workerName, key string) error {
}

// this is effectively a cache GET operation
func (c *ChartCache) FetchForOne(key string) ([]byte, error) {
func (c *ChartCache) Fetch(key string) ([]byte, error) {
c.resyncCond.L.(*sync.RWMutex).RLock()
defer c.resyncCond.L.(*sync.RWMutex).RUnlock()

log.Infof("+FetchForOne(%s)", key)
log.Infof("+Fetch(%s)", key)

// read back from cache: should be either:
// - what we previously wrote OR
Expand All @@ -440,7 +440,7 @@ func (c *ChartCache) FetchForOne(key string) ([]byte, error) {
log.Infof("Redis [GET %s]: Nil", key)
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("fetchForOne() failed to get value for key [%s] from cache due to: %v", key, err)
return nil, fmt.Errorf("fetch() failed to get value for key [%s] from cache due to: %v", key, err)
}
log.Infof("Redis [GET %s]: %d bytes read", key, len(byteArray))

Expand All @@ -453,7 +453,7 @@ func (c *ChartCache) FetchForOne(key string) ([]byte, error) {
}

/*
GetForOne() is like FetchForOne() but if there is a cache miss, it will then get chart data based on
Get() is like Fetch() but if there is a cache miss, it will then get chart data based on
the corresponding repo object, process it and then add it to the cache and return the
result.
This func should:
Expand All @@ -466,13 +466,13 @@ func (c *ChartCache) FetchForOne(key string) ([]byte, error) {
• otherwise return the bytes stored in the
chart cache for the given entry
*/
func (c *ChartCache) GetForOne(key string, chart *models.Chart, downloadFn DownloadChartFn) ([]byte, error) {
func (c *ChartCache) Get(key string, chart *models.Chart, downloadFn DownloadChartFn) ([]byte, error) {
// TODO (gfichtenholt) it'd be nice to get rid of all arguments except for the key, similar to that of
// NamespacedResourceWatcherCache.GetForOne()
log.Infof("+GetForOne(%s)", key)
// NamespacedResourceWatcherCache.Get()
log.Infof("+Get(%s)", key)
var value []byte
var err error
if value, err = c.FetchForOne(key); err != nil {
if value, err = c.Fetch(key); err != nil {
return nil, err
} else if value == nil {
// cache miss
Expand Down Expand Up @@ -508,7 +508,7 @@ func (c *ChartCache) GetForOne(key string, chart *models.Chart, downloadFn Downl
c.queue.Add(key)
// now need to wait until this item has been processed by runWorker().
c.queue.WaitUntilForgotten(key)
return c.FetchForOne(key)
return c.Fetch(key)
}
}
return value, nil
Expand Down Expand Up @@ -613,7 +613,7 @@ func chartCacheKeyFor(namespace, chartID, chartVersion string) (string, error) {
chartVersion), nil
}

// FYI: The work queue is able to retry transient HTTP errors
// FYI: The work queue is able to retry transient HTTP errors that occur while invoking downloadFn
func ChartCacheComputeValue(chartID, chartUrl, chartVersion string, downloadFn DownloadChartFn) ([]byte, error) {
chartTgz, err := downloadFn(chartID, chartUrl, chartVersion)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type NamespacedResourceWatcherCache struct {
// significant in that it flushes the whole redis cache and re-populates the state from k8s.
// When that happens we don't really want any concurrent access to the cache until the resync()
// operation is complete. In other words, we want to:
// - be able to have multiple concurrent readers (goroutines doing GetForOne()/GetForMultiple())
// - be able to have multiple concurrent readers (goroutines doing Get()/GetMultiple())
// - only a single writer (goroutine doing a resync()) is allowed, and while its doing its job
// no readers are allowed
resyncCond *sync.Cond
Expand Down Expand Up @@ -531,7 +531,7 @@ func (c *NamespacedResourceWatcherCache) syncHandler(key string) error {
defer log.Infof("-syncHandler(%s)", key)

// Convert the namespace/name string into a distinct namespace and name
name, err := c.fromKey(key)
name, err := c.NamespacedNameFromKey(key)
if err != nil {
return err
}
Expand Down Expand Up @@ -645,8 +645,8 @@ func (c *NamespacedResourceWatcherCache) onDelete(key string) error {
}

// this is effectively a cache GET operation
func (c *NamespacedResourceWatcherCache) fetchForOne(key string) (interface{}, error) {
log.InfoS("+fetchForOne", "key", key)
func (c *NamespacedResourceWatcherCache) fetch(key string) (interface{}, error) {
log.InfoS("+fetch", "key", key)
// read back from cache: should be either:
// - what we previously wrote OR
// - redis.Nil if the key does not exist or has been evicted due to memory pressure/TTL expiry
Expand All @@ -657,7 +657,7 @@ func (c *NamespacedResourceWatcherCache) fetchForOne(key string) (interface{}, e
log.V(4).Infof("Redis [GET %s]: Nil", key)
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("fetchForOne() failed to get value for key [%s] from cache due to: %v", key, err)
return nil, fmt.Errorf("fetch() failed to get value for key [%s] from cache due to: %v", key, err)
}
log.V(4).Infof("Redis [GET %s]: %d bytes read", key, len(byteArray))

Expand All @@ -683,10 +683,10 @@ func (c *NamespacedResourceWatcherCache) fetchForOne(key string) (interface{}, e
// be relied upon to be the "source of truth". So I removed it for now as I found it
// of no use

// parallelize the process of value retrieval because fetchForOne() calls
// parallelize the process of value retrieval because fetch() calls
// c.config.onGet() which will de-code the data from bytes into expected struct, which
// may be computationally expensive and thus benefit from multiple threads of execution
func (c *NamespacedResourceWatcherCache) fetchForMultiple(keys sets.String) (map[string]interface{}, error) {
func (c *NamespacedResourceWatcherCache) fetchMultiple(keys sets.String) (map[string]interface{}, error) {
response := make(map[string]interface{})

type fetchValueJob struct {
Expand All @@ -710,7 +710,7 @@ func (c *NamespacedResourceWatcherCache) fetchForMultiple(keys sets.String) (map
// The following loop will only terminate when the request channel is
// closed (and there are no more items)
for job := range requestChan {
result, err := c.fetchForOne(job.key)
result, err := c.fetch(job.key)
responseChan <- fetchValueJobResult{job, result, err}
}
wg.Done()
Expand Down Expand Up @@ -743,25 +743,25 @@ func (c *NamespacedResourceWatcherCache) fetchForMultiple(keys sets.String) (map
return response, errorutil.NewAggregate(errs)
}

// the difference between 'fetchForMultiple' and 'GetForMultiple' is that 'fetch' will only
// the difference between 'fetchMultiple' and 'GetMultiple' is that 'fetch' will only
// get the value from the cache for a given or return nil if one is missing, whereas
// 'GetForMultiple' will first call 'fetch' but then for any cache misses it will force
// 'GetMultiple' will first call 'fetch' but then for any cache misses it will force
// a re-computation of the value, if available, based on the input argument itemList and load
// that result into the cache. So, 'GetForMultiple' provides a guarantee that if a key exists,
// that result into the cache. So, 'GetMultiple' provides a guarantee that if a key exists,
// it's value will be returned,
// whereas 'fetchForMultiple' does not guarantee that.
// whereas 'fetchMultiple' does not guarantee that.
// The keys are expected to be in the format of the cache (the caller does that)
func (c *NamespacedResourceWatcherCache) GetForMultiple(keys sets.String) (map[string]interface{}, error) {
func (c *NamespacedResourceWatcherCache) GetMultiple(keys sets.String) (map[string]interface{}, error) {
c.resyncCond.L.(*sync.RWMutex).RLock()
defer c.resyncCond.L.(*sync.RWMutex).RUnlock()

log.Infof("+GetForMultiple(%s)", keys)
log.Infof("+GetMultiple(%s)", keys)
// at any given moment, the redis cache may only have a subset of the entire set of existing keys.
// Some key may have been evicted due to memory pressure and LRU eviction policy.
// ref: https://redis.io/topics/lru-cache
// so, first, let's fetch the entries that are still cached at this moment
// before redis maybe forced to evict those in order to make room for new ones
chartsUntyped, err := c.fetchForMultiple(keys)
chartsUntyped, err := c.fetchMultiple(keys)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -836,7 +836,7 @@ func (c *NamespacedResourceWatcherCache) computeValuesForKeys(keys sets.String)
// The following loop will only terminate when the request channel is
// closed (and there are no more items)
for key := range requestChan {
// see GetForOne() for explanation of what is happening below
// see Get() for explanation of what is happening below
c.forceKey(key)
}
wg.Done()
Expand Down Expand Up @@ -876,8 +876,8 @@ func (c *NamespacedResourceWatcherCache) computeAndFetchValuesForKeys(keys sets.
// The following loop will only terminate when the request channel is
// closed (and there are no more items)
for job := range requestChan {
// see GetForOne() for explanation of what is happening below
value, err := c.forceAndFetchKey(job.key)
// see Get() for explanation of what is happening below
value, err := c.ForceAndFetch(job.key)
responseChan <- computeValueJobResult{job, value, err}
}
wg.Done()
Expand Down Expand Up @@ -937,29 +937,29 @@ func (c *NamespacedResourceWatcherCache) KeyForNamespacedName(name types.Namespa

// the opposite of keyFor()
// the goal is to keep the details of what exactly the key looks like localized to one piece of code
func (c *NamespacedResourceWatcherCache) fromKey(key string) (*types.NamespacedName, error) {
func (c *NamespacedResourceWatcherCache) NamespacedNameFromKey(key string) (*types.NamespacedName, error) {
parts := strings.Split(key, KeySegmentsSeparator)
if len(parts) != 3 || parts[0] != c.config.Gvr.Resource || len(parts[1]) == 0 || len(parts[2]) == 0 {
return nil, status.Errorf(codes.Internal, "invalid key [%s]", key)
}
return &types.NamespacedName{Namespace: parts[1], Name: parts[2]}, nil
}

// GetForOne() is like fetchForOne() but if there is a cache miss, it will also check the
// Get() is like fetch() but if there is a cache miss, it will also check the
// k8s for the corresponding object, process it and then add it to the cache and return the
// result.
func (c *NamespacedResourceWatcherCache) GetForOne(key string) (interface{}, error) {
func (c *NamespacedResourceWatcherCache) Get(key string) (interface{}, error) {
c.resyncCond.L.(*sync.RWMutex).RLock()
defer c.resyncCond.L.(*sync.RWMutex).RUnlock()

log.Infof("+GetForOne(%s)", key)
log.Infof("+Get(%s)", key)
var value interface{}
var err error
if value, err = c.fetchForOne(key); err != nil {
if value, err = c.fetch(key); err != nil {
return nil, err
} else if value == nil {
// cache miss
return c.forceAndFetchKey(key)
return c.ForceAndFetch(key)
}
return value, nil
}
Expand All @@ -975,14 +975,14 @@ func (c *NamespacedResourceWatcherCache) forceKey(key string) {
c.queue.WaitUntilForgotten(key)
}

func (c *NamespacedResourceWatcherCache) forceAndFetchKey(key string) (interface{}, error) {
func (c *NamespacedResourceWatcherCache) ForceAndFetch(key string) (interface{}, error) {
c.forceKey(key)
// yes, there is a small time window here between after we are done with WaitUntilForgotten()
// and the following fetch, where another concurrent goroutine may force the newly added
// cache entry out, but that is an edge case and I am willing to overlook it for now
// To fix it, would somehow require WaitUntilForgotten() returning a value from a cache, so
// the whole thing would be atomic. Don't know how to do this yet
return c.fetchForOne(key)
return c.fetch(key)
}

// this func is used by unit tests only
Expand Down
43 changes: 31 additions & 12 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ func (s *Server) availableChartDetail(ctx context.Context, packageRef *corev1.Av
if chartVersion != "" {
if key, err := s.chartCache.KeyFor(repoName.Namespace, chartID, chartVersion); err != nil {
return nil, err
} else if byteArray, err = s.chartCache.FetchForOne(key); err != nil {
} else if byteArray, err = s.chartCache.Fetch(key); err != nil {
return nil, err
}
}

if byteArray == nil {
// no specific chart version was provided or a cache miss, need to do a bit of work
chartModel, err := s.getChart(ctx, repoName, chartName)
chartModel, err := s.getChartModel(ctx, repoName, chartName)
if err != nil {
return nil, err
} else if chartModel == nil {
Expand Down Expand Up @@ -104,7 +104,7 @@ func (s *Server) availableChartDetail(ctx context.Context, packageRef *corev1.Av
fn = downloadHttpChartFn(opts)
}
}
if byteArray, err = s.chartCache.GetForOne(key, chartModel, fn); err != nil {
if byteArray, err = s.chartCache.Get(key, chartModel, fn); err != nil {
return nil, err
}

Expand Down Expand Up @@ -136,25 +136,44 @@ func (s *Server) availableChartDetail(ctx context.Context, packageRef *corev1.Av
return pkgDetail, nil
}

func (s *Server) getChart(ctx context.Context, repo types.NamespacedName, chartName string) (*models.Chart, error) {
func (s *Server) getChartModel(ctx context.Context, repoName types.NamespacedName, chartName string) (*models.Chart, error) {
if s.repoCache == nil {
return nil, status.Errorf(codes.FailedPrecondition, "server cache has not been properly initialized")
} else if ok, err := s.hasAccessToNamespace(ctx, common.GetChartsGvr(), repo.Namespace); err != nil {
} else if ok, err := s.hasAccessToNamespace(ctx, common.GetChartsGvr(), repoName.Namespace); err != nil {
return nil, err
} else if !ok {
return nil, status.Errorf(codes.PermissionDenied, "user has no [get] access for HelmCharts in namespace [%s]", repo.Namespace)
return nil, status.Errorf(codes.PermissionDenied, "user has no [get] access for HelmCharts in namespace [%s]", repoName.Namespace)
}

key := s.repoCache.KeyForNamespacedName(repo)
if entry, err := s.repoCache.GetForOne(key); err != nil {
key := s.repoCache.KeyForNamespacedName(repoName)
value, err := s.repoCache.Get(key)
if err != nil {
return nil, err
} else if entry != nil {
if typedEntry, ok := entry.(repoCacheEntryValue); !ok {
} else if value != nil {
if typedValue, ok := value.(repoCacheEntryValue); !ok {
return nil, status.Errorf(
codes.Internal,
"unexpected value fetched from cache: type: [%s], value: [%v]", reflect.TypeOf(entry), entry)
"unexpected value fetched from cache: type: [%s], value: [%v]",
reflect.TypeOf(value), value)
} else {
for _, chart := range typedEntry.Charts {
if typedValue.Type == "oci" {
// ref https://github.com/vmware-tanzu/kubeapps/issues/5007#issuecomment-1217293240
// helm OCI chart repos are not automatically updated when the
// state on remote changes. So we will force new checksum
// computation and update local cache if needed
value, err := s.repoCache.ForceAndFetch(key)
if err != nil {
return nil, err
}
typedValue, ok = value.(repoCacheEntryValue)
if !ok {
return nil, status.Errorf(
codes.Internal,
"unexpected value fetched from cache: type: [%s], value: [%v]",
reflect.TypeOf(value), value)
}
}
for _, chart := range typedValue.Charts {
if chart.Name == chartName {
return &chart, nil // found it
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,32 @@ func TestKindClusterAvailablePackageEndpointsForOCI(t *testing.T) {
},
}

/*
gcp_user := "oauth2accesstoken"
// token is very short lived
gcp_pwd, err := gcloudPrintAccessToken(t)
if err != nil {
t.Fatal(err)
}
testCases := []struct {
testName string
registryUrl string
secret *apiv1.Secret
}{
{
testName: "Testing [" + gcp_stefanprodan_podinfo_oci_registry_url + "] with basic auth secret",
registryUrl: gcp_stefanprodan_podinfo_oci_registry_url,
secret: newBasicAuthSecret(types.NamespacedName{
Name: "oci-repo-secret-" + randSeq(4),
Namespace: "default"},
gcp_user,
gcp_pwd,
),
},
}
*/

adminName := types.NamespacedName{
Name: "test-admin-" + randSeq(4),
Namespace: "default",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,25 @@ var (
}
}

add_repo_req_28 = func(server, user, password string) *corev1.AddPackageRepositoryRequest {
return &corev1.AddPackageRepositoryRequest{
Name: "my-podinfo-10",
Context: &corev1.Context{Namespace: "default"},
Type: "oci",
Url: gcp_stefanprodan_podinfo_oci_registry_url,
Auth: &corev1.PackageRepositoryAuth{
Type: corev1.PackageRepositoryAuth_PACKAGE_REPOSITORY_AUTH_TYPE_DOCKER_CONFIG_JSON,
PackageRepoAuthOneOf: &corev1.PackageRepositoryAuth_DockerCreds{
DockerCreds: &corev1.DockerCredentials{
Server: server,
Username: user,
Password: password,
},
},
},
}
}

add_repo_expected_resp = &corev1.AddPackageRepositoryResponse{
PackageRepoRef: repoRef("bar", "foo"),
}
Expand Down Expand Up @@ -2167,6 +2186,13 @@ var (
},
}

expected_versions_gfichtenholt_podinfo_3 = &corev1.GetAvailablePackageVersionsResponse{
PackageAppVersions: []*corev1.PackageAppVersion{
{PkgVersion: "6.1.6"},
{PkgVersion: "6.1.5"},
},
}

create_package_simple_req = &corev1.CreateInstalledPackageRequest{
AvailablePackageRef: availableRef("podinfo/podinfo", "namespace-1"),
Name: "my-podinfo",
Expand Down
Loading

0 comments on commit a644ff4

Please sign in to comment.