Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for flux plugin: clean up old charts from chart cache after repo update #4115 #5644

Merged
merged 17 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ func (c *ChartCache) SyncCharts(charts []models.Chart, downloadFn DownloadChartF
log.Warningf("Skipping chart [%s] due to empty version array", chart.ID)
continue
} else if len(chart.ChartVersions[0].URLs) == 0 {
log.Warningf("Chart: [%s], version: [%s] has no URLs", chart.ID, chart.ChartVersions[0].Version)
log.Warningf("Skipping chart [%s], version: [%s] has no URLs", chart.ID, chart.ChartVersions[0].Version)
continue
} else if chart.Repo == nil {
// shouldn't happen
log.Warningf("Skipping chart [%s] as it is not associated with any repo", chart.ID)
continue
}

Expand Down Expand Up @@ -259,10 +263,9 @@ func (c *ChartCache) processNextWorkItem(workerName string) bool {
return true
}

func (c *ChartCache) DeleteChartsForRepo(repo *types.NamespacedName) error {
log.Infof("+DeleteChartsForRepo(%s)", repo)
defer log.Infof("-DeleteChartsForRepo(%s)", repo)

// will clear out the cache of charts for a given repo except the charts specified by
// keepThese argument, which may be nil.
func (c *ChartCache) deleteChartsHelper(repo *types.NamespacedName, keepThese sets.String) error {
// need to get a list of all charts/versions for this repo that are either:
// a. already in the cache OR
// b. being processed
Expand All @@ -287,6 +290,7 @@ func (c *ChartCache) DeleteChartsForRepo(repo *types.NamespacedName) error {
if err != nil {
return err
}
log.Infof("Redis [SCAN %d %s]: %d keys", cursor, match, len(keys))
for _, k := range keys {
redisKeysToDelete.Insert(k)
}
Expand All @@ -308,7 +312,7 @@ func (c *ChartCache) DeleteChartsForRepo(repo *types.NamespacedName) error {
}
}

for k := range redisKeysToDelete {
for k := range redisKeysToDelete.Difference(keepThese) {
if namespace, chartID, chartVersion, err := c.fromKey(k); err != nil {
log.Errorf("%+v", err)
} else {
Expand All @@ -329,6 +333,52 @@ func (c *ChartCache) DeleteChartsForRepo(repo *types.NamespacedName) error {
return nil
}

func (c *ChartCache) DeleteChartsForRepo(repo *types.NamespacedName) error {
log.Infof("+DeleteChartsForRepo(%s)", repo)
defer log.Infof("-DeleteChartsForRepo(%s)", repo)

return c.deleteChartsHelper(repo, sets.String{})
}

// this function is called when re-importing charts after an update to the repo,
// so keepThese is actually populated from the new data, meaning that if the new
// data no longer includes a certain version, it'll get purged here
func (c *ChartCache) PurgeObsoleteChartVersions(keepThese []models.Chart) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just reading the code below, I think keepThese is the slice of charts for which you're wanting to keep all versions? (as in, below, each chart is iterated with the redis key for each version added to the set of keys to keep for that repo, which is then passed as keep to deleteChartsHelper).

I must be missing something here, because it looks like it only ever calls deleteChartsHelper with repos of charts that were in the keepThese slice? I expected to find a function that purges all other repositories, other than the ones you wanted to keep (maybe it does and I'm just mis-reading).

EDIT: Oh - reading further down, I think I understand: this function is called when re-importing charts after an update to the repo, for example, so keepThese is actually populated from the new data, meaning that if the new data no longer includes a certain version, it'll get purged here. OK, I think I was confused initially because it wasn't clear to me that keepThese was the new data that hadn't been synced yet. (?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you got it

log.Infof("+PurgeObsoleteChartVersions()")
defer log.Infof("-PurgeObsoleteChartVersions")

repos := map[types.NamespacedName]sets.String{}
for _, ch := range keepThese {
if ch.Repo == nil {
// shouldn't happen
log.Warningf("Skipping chart [%s] as it is not associated with any repo", ch.ID)
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should that be an error if this function is called with a chart with a nil repo? Or at least, logging the issue? Wondering why we'd not want to report that somehow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't ever happen, but you're right. Let me at least log it

}
n := types.NamespacedName{
Name: ch.Repo.Name,
Namespace: ch.Repo.Namespace,
}
a, ok := repos[n]
if a == nil || !ok {
a = sets.String{}
}
for _, cv := range ch.ChartVersions {
if key, err := c.KeyFor(ch.Repo.Namespace, ch.ID, cv.Version); err != nil {
return err
} else {
repos[n] = a.Insert(key)
}
}
}

for repo, keep := range repos {
if err := c.deleteChartsHelper(&repo, keep); err != nil {
return err
}
}
return nil
}

func (c *ChartCache) OnResync() error {
log.Infof("+OnResync(), queue: [%s], size: [%d]", c.queue.Name(), c.queue.Len())
c.resyncCond.L.Lock()
Expand Down Expand Up @@ -620,7 +670,7 @@ func ChartCacheComputeValue(chartID, chartUrl, chartVersion string, downloadFn D
return nil, err
}

log.Infof("Successfully fetched details for chart: [%s], version: [%s], url: [%s], details: [%d] bytes",
log.V(4).Infof("Successfully fetched details for chart: [%s], version: [%s], url: [%s], details: [%d] bytes",
chartID, chartVersion, chartUrl, len(chartTgz))

cacheEntryValue := chartCacheEntryValue{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *Server) availableChartDetail(ctx context.Context, packageRef *corev1.Av
}

if byteArray == nil {
return nil, status.Errorf(codes.Internal, "failed to load details for chart [%s]", chartModel.ID)
return nil, status.Errorf(codes.Internal, "failed to load details for chart [%s], version [%s]", chartModel.ID, chartVersion)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@ import (
"time"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
corev1 "github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/gen/core/packages/v1alpha1"
plugins "github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/gen/core/plugins/v1alpha1"
fluxplugin "github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/gen/plugins/fluxv2/packages/v1alpha1"
"github.com/vmware-tanzu/kubeapps/cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/common"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -854,22 +851,10 @@ func testKindClusterAvailablePackageEndpointsForOCIHelper(
t.Fatal(err)
}

opt1 := cmpopts.IgnoreUnexported(
corev1.GetAvailablePackageSummariesResponse{},
corev1.AvailablePackageSummary{},
corev1.AvailablePackageReference{},
corev1.Context{},
plugins.Plugin{},
corev1.PackageAppVersion{})
opt2 := cmpopts.SortSlices(lessAvailablePackageFunc)
if !tc.unauthenticated {
if got, want := resp, expected_oci_stefanprodan_podinfo_available_summaries(repoName.Name); !cmp.Equal(got, want, opt1, opt2) {
t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got, opt1, opt2))
}
compareAvailablePackageSummaries(t, resp, expected_oci_stefanprodan_podinfo_available_summaries(repoName.Name))
} else {
if got, want := resp, no_available_summaries(repoName.Name); !cmp.Equal(got, want, opt1, opt2) {
t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got, opt1, opt2))
}
compareAvailablePackageSummaries(t, resp, no_available_summaries(repoName.Name))
return // nothing more to check
}

Expand All @@ -889,12 +874,7 @@ func testKindClusterAvailablePackageEndpointsForOCIHelper(
if err != nil {
t.Fatal(err)
}
opts := cmpopts.IgnoreUnexported(
corev1.GetAvailablePackageVersionsResponse{},
corev1.PackageAppVersion{})
if got, want := resp2, expected_versions_stefanprodan_podinfo; !cmp.Equal(want, got, opts) {
t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got, opts))
}
compareAvailablePackageVersions(t, resp2, expected_versions_stefanprodan_podinfo)

hour, minute, second = time.Now().Clock()
t.Logf("[%d:%d:%d] Calling GetAvailablePackageDetail(latest version) blocking for up to [%s]...",
Expand All @@ -914,7 +894,7 @@ func testKindClusterAvailablePackageEndpointsForOCIHelper(
t.Fatal(err)
}

compareActualVsExpectedAvailablePackageDetail(
compareAvailablePackageDetail(
t,
resp3.AvailablePackageDetail,
expected_detail_oci_stefanprodan_podinfo(repoName.Name, tc.registryUrl).AvailablePackageDetail)
Expand All @@ -936,7 +916,7 @@ func testKindClusterAvailablePackageEndpointsForOCIHelper(
t.Fatal(err)
}

compareActualVsExpectedAvailablePackageDetail(
compareAvailablePackageDetail(
t,
resp4.AvailablePackageDetail,
expected_detail_oci_stefanprodan_podinfo_2(repoName.Name, tc.registryUrl).AvailablePackageDetail)
Expand Down Expand Up @@ -1026,17 +1006,134 @@ func TestKindClusterAvailablePackageEndpointsOCIRepo2Charts(t *testing.T) {
t.Fatal(err)
}

opt1 := cmpopts.IgnoreUnexported(
corev1.GetAvailablePackageSummariesResponse{},
corev1.AvailablePackageSummary{},
corev1.AvailablePackageReference{},
corev1.Context{},
plugins.Plugin{},
corev1.PackageAppVersion{})
opt2 := cmpopts.SortSlices(lessAvailablePackageFunc)
if got, want := resp, expected_oci_repo_with_2_charts_available_summaries(repoName.Name); !cmp.Equal(got, want, opt1, opt2) {
t.Errorf("mismatch (-want +got):\n%s", cmp.Diff(want, got, opt1, opt2))
compareAvailablePackageSummaries(t, resp, expected_oci_repo_with_2_charts_available_summaries(repoName.Name))
})
}
}

// The goal of this integration test is to ensure that when the contents of remote HTTP helm repo is changed,
// that fact is recorded locally and processed properly (repo/chart cache is updated with latest, etc.)
func TestKindClusterAddRemovePackageVersionsInHttpRepo(t *testing.T) {
fluxPluginPackagesClient, _, err := checkEnv(t)
if err != nil {
t.Fatal(err)
}

adminAcctName := types.NamespacedName{
Name: "test-add-remove-versions-repo-admin-" + randSeq(4),
Namespace: "default",
}
grpcContext, err := newGrpcAdminContext(t, adminAcctName)
if err != nil {
t.Fatal(err)
}

repoName := types.NamespacedName{
Name: "podinfo",
Namespace: "test-" + randSeq(4),
}
if err := kubeCreateNamespaceAndCleanup(t, repoName.Namespace); err != nil {
t.Fatal(err)
}

if err = kubeAddHelmRepositoryAndCleanup(t, repoName, "", podinfo_repo_url, "", 10*time.Second); err != nil {
t.Fatal(err)
}

pkgRef := availableRef(fmt.Sprintf("%s/%s", repoName.Name, "podinfo"), repoName.Namespace)

// need to wait until repo is indexed by flux plugin
const maxWait = 25
var pkgDetail *corev1.GetAvailablePackageDetailResponse
for i := 0; i <= maxWait; i++ {
grpcContext, cancel := context.WithTimeout(grpcContext, defaultContextTimeout)
defer cancel()

pkgDetail, err = fluxPluginPackagesClient.GetAvailablePackageDetail(
grpcContext,
&corev1.GetAvailablePackageDetailRequest{AvailablePackageRef: pkgRef})
if err == nil {
break
} else if i == maxWait {
if repo, err2 := kubeGetHelmRepository(t, repoName); err2 == nil && repo != nil {
t.Fatalf("Timed out waiting for available package [%s], last response: %v, last error: [%v],\nhelm repository:%s",
pkgRef, pkgDetail, err, common.PrettyPrint(repo))
} else {
t.Fatalf("Timed out waiting for available package [%s], last response: %v, last error: [%v]",
pkgRef, pkgDetail, err)
}
} else {
t.Logf("Waiting 1s for repository [%s] to be indexed, attempt [%d/%d]...", repoName, i+1, maxWait)
time.Sleep(1 * time.Second)
}
}
compareAvailablePackageDetail(
t,
pkgDetail.AvailablePackageDetail,
expected_detail_podinfo(repoName.Name, repoName.Namespace).AvailablePackageDetail)

podName, err := getFluxPluginTestdataPodName()
if err != nil {
t.Fatal(err)
}
t.Logf("podName = [%s]", podName)

if err = kubeCopyFileToPod(
t,
testTgz("podinfo-6.0.3.tgz"),
*podName,
"/usr/share/nginx/html/podinfo/podinfo-6.0.3.tgz"); err != nil {
t.Fatal(err)
}
if err = kubeCopyFileToPod(
t,
testYaml("podinfo-index-updated.yaml"),
*podName,
"/usr/share/nginx/html/podinfo/index.yaml"); err != nil {
t.Fatal(err)
}

SleepWithCountdown(t, 20)

pkgDetail, err = fluxPluginPackagesClient.GetAvailablePackageDetail(
grpcContext,
&corev1.GetAvailablePackageDetailRequest{AvailablePackageRef: pkgRef})
if err != nil {
t.Fatal(err)
}
compareAvailablePackageDetail(
t,
pkgDetail.AvailablePackageDetail,
expected_detail_podinfo_after_update_1(repoName.Name, repoName.Namespace).AvailablePackageDetail)

if err = kubeCopyFileToPod(
t,
testYaml("podinfo-index.yaml"),
*podName,
"/usr/share/nginx/html/podinfo/index.yaml"); err != nil {
t.Logf("Error reverting to previous podinfo index: %v", err)
}

SleepWithCountdown(t, 20)

pkgDetail, err = fluxPluginPackagesClient.GetAvailablePackageDetail(
grpcContext,
&corev1.GetAvailablePackageDetailRequest{AvailablePackageRef: pkgRef})
if err != nil {
t.Fatal(err)
}
compareAvailablePackageDetail(
t,
pkgDetail.AvailablePackageDetail,
expected_detail_podinfo(repoName.Name, repoName.Namespace).AvailablePackageDetail)

_, err = fluxPluginPackagesClient.GetAvailablePackageDetail(
grpcContext,
&corev1.GetAvailablePackageDetailRequest{
AvailablePackageRef: pkgRef,
PkgVersion: "6.0.3",
})
if status.Code(err) != codes.Internal {
t.Fatalf("Expected Internal, got: %v", err)
}
}
Loading