Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flux oci support 2 #4932

Merged
merged 13 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type chartCacheStoreEntry struct {
id string
version string
url string
clientOptions *common.ClientOptions
clientOptions *common.HttpClientOptions
deleted bool
}

Expand Down Expand Up @@ -118,7 +118,7 @@ func NewChartCache(name string, redisCli *redis.Client, stopCh <-chan struct{})

// this func will enqueue work items into chart work queue and return.
// the charts will be synced worker threads running in the background
func (c *ChartCache) SyncCharts(charts []models.Chart, clientOptions *common.ClientOptions) error {
func (c *ChartCache) SyncCharts(charts []models.Chart, clientOptions *common.HttpClientOptions) error {
log.Infof("+SyncCharts()")
totalToSync := 0
defer func() {
Expand Down Expand Up @@ -454,7 +454,7 @@ func (c *ChartCache) FetchForOne(key string) ([]byte, error) {
• otherwise return the bytes stored in the
chart cache for the given entry
*/
func (c *ChartCache) GetForOne(key string, chart *models.Chart, clientOptions *common.ClientOptions) ([]byte, error) {
func (c *ChartCache) GetForOne(key string, chart *models.Chart, clientOptions *common.HttpClientOptions) ([]byte, error) {
// TODO (gfichtenholt) it'd be nice to get rid of all arguments except for the key, similar to that of
// NamespacedResourceWatcherCache.GetForOne()
log.Infof("+GetForOne(%s)", key)
Expand Down Expand Up @@ -599,7 +599,7 @@ func chartCacheKeyFor(namespace, chartID, chartVersion string) (string, error) {
}

// FYI: The work queue is able to retry transient HTTP errors
func ChartCacheComputeValue(chartID, chartUrl, chartVersion string, clientOptions *common.ClientOptions) ([]byte, error) {
func ChartCacheComputeValue(chartID, chartUrl, chartVersion string, clientOptions *common.HttpClientOptions) ([]byte, error) {
client, headers, err := common.NewHttpClientAndHeaders(clientOptions)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -95,7 +96,7 @@ func TestKindClusterGetAvailablePackageSummariesForLargeReposAndTinyRedis(t *tes
Namespace: "default",
}
// this is to make sure we allow enough time for repository to be created and come to ready state
if err = kubeAddHelmRepositoryAndCleanup(t, repo, in_cluster_bitnami_url, "", 0); err != nil {
if err = kubeAddHelmRepositoryAndCleanup(t, repo, "", in_cluster_bitnami_url, "", 0); err != nil {
t.Fatalf("%v", err)
}
// wait until this repo have been indexed and cached up to 10 minutes
Expand Down Expand Up @@ -188,7 +189,7 @@ func TestKindClusterGetAvailablePackageSummariesForLargeReposAndTinyRedis(t *tes
Namespace: "default",
}
// this is to make sure we allow enough time for repository to be created and come to ready state
if err = kubeAddHelmRepositoryAndCleanup(t, repo, in_cluster_bitnami_url, "", 0); err != nil {
if err = kubeAddHelmRepositoryAndCleanup(t, repo, "", in_cluster_bitnami_url, "", 0); err != nil {
t.Fatalf("%v", err)
}
// wait until this repo have been indexed and cached up to 10 minutes
Expand Down Expand Up @@ -278,7 +279,7 @@ func TestKindClusterRepoAndChartRBAC(t *testing.T) {
for _, n := range names {
if err := kubeCreateNamespaceAndCleanup(t, n.Namespace); err != nil {
t.Fatal(err)
} else if err = kubeAddHelmRepositoryAndCleanup(t, n, podinfo_repo_url, "", 0); err != nil {
} else if err = kubeAddHelmRepositoryAndCleanup(t, n, "", podinfo_repo_url, "", 0); err != nil {
t.Fatal(err)
}
// wait until this repo reaches 'Ready'
Expand Down Expand Up @@ -512,3 +513,71 @@ func TestKindClusterRepoAndChartRBAC(t *testing.T) {
}
}
}

func TestKindClusterGetAvailablePackageSummariesForOCI(t *testing.T) {
fluxPluginClient, fluxPluginReposClient, err := checkEnv(t)
if err != nil {
t.Fatal(err)
}

ghUser := os.Getenv("GITHUB_USER")
if ghUser == "" {
t.Fatalf("Environment variable GITHUB_USER needs to be set")
}
ghToken := os.Getenv("GITHUB_TOKEN")
if ghToken == "" {
t.Fatalf("Environment variable GITHUB_TOKEN needs to be set")
}

adminName := types.NamespacedName{
Name: "test-admin-" + randSeq(4),
Namespace: "default",
}
grpcContext, err := newGrpcAdminContext(t, adminName)
if err != nil {
t.Fatal(err)
}

repoName := types.NamespacedName{
Name: "my-podinfo-" + randSeq(4),
Namespace: "default",
}

// this is a secret for authentication with GitHub (ghcr.io)
// personal access token ghp_... can be seen on https://github.com/settings/tokens
// and has "admin:repo_hook, delete_repo, repo" scopes
// one should be able to login successfully like this:
// docker login ghcr.io -u $GITHUB_USER -p $GITHUB_TOKEN

ghSecret := newBasicAuthSecret(types.NamespacedName{
Name: "github-secret-1",
Namespace: repoName.Namespace},
ghUser, ghToken)

if err := kubeCreateSecretAndCleanup(t, ghSecret); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithTimeout(grpcContext, defaultContextTimeout)
defer cancel()
setUserManagedSecretsAndCleanup(t, fluxPluginReposClient, ctx, true)

if err := kubeAddHelmRepositoryAndCleanup(
t, repoName, "oci", "oci://ghcr.io/stefanprodan/charts", ghSecret.Name, 0); err != nil {
t.Fatalf("%v", err)
}
// wait until this repo reaches 'Ready'
if err = kubeWaitUntilHelmRepositoryIsReady(t, repoName); err != nil {
t.Fatal(err)
}

grpcContext, cancel = context.WithTimeout(grpcContext, 90*time.Second)
defer cancel()
resp, err := fluxPluginClient.GetAvailablePackageSummaries(
grpcContext,
&corev1.GetAvailablePackageSummariesRequest{})
if err != nil {
t.Fatalf("%v", err)
}

t.Logf("=======> %s", common.PrettyPrint(resp))
}
10 changes: 5 additions & 5 deletions cmd/kubeapps-apis/plugins/fluxv2/packages/v1alpha1/chart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestGetAvailablePackageDetail(t *testing.T) {
requestChartUrl := ""

// these will be used later in a few places
opts := &common.ClientOptions{}
opts := &common.HttpClientOptions{}
if tc.basicAuth {
opts.Username = "foo"
opts.Password = "bar"
Expand Down Expand Up @@ -743,7 +743,7 @@ func TestChartCacheResyncNotIdle(t *testing.T) {
redisMockSetValueForRepo(mock, repoKey, repoBytes, nil)
}

opts := &common.ClientOptions{}
opts := &common.HttpClientOptions{}
chartCacheKeys := []string{}
var chartBytes []byte
for i := 0; i < NUM_CHARTS; i++ {
Expand Down Expand Up @@ -960,15 +960,15 @@ func newChart(name, namespace string, spec *sourcev1.HelmChartSpec, status *sour
return helmChart
}

func (s *Server) redisMockSetValueForChart(mock redismock.ClientMock, key, url string, opts *common.ClientOptions) error {
func (s *Server) redisMockSetValueForChart(mock redismock.ClientMock, key, url string, opts *common.HttpClientOptions) error {
sink := repoEventSink{
clientGetter: s.newBackgroundClientGetter(),
chartCache: s.chartCache,
}
return sink.redisMockSetValueForChart(mock, key, url, opts)
}

func (cs *repoEventSink) redisMockSetValueForChart(mock redismock.ClientMock, key, url string, opts *common.ClientOptions) error {
func (cs *repoEventSink) redisMockSetValueForChart(mock redismock.ClientMock, key, url string, opts *common.HttpClientOptions) error {
_, chartID, version, err := fromRedisKeyForChart(key)
if err != nil {
return err
Expand All @@ -988,7 +988,7 @@ func redisMockSetValueForChart(mock redismock.ClientMock, key string, byteArray
}

// does a series of mock.ExpectGet(...)
func redisMockExpectGetFromChartCache(mock redismock.ClientMock, key, url string, opts *common.ClientOptions) error {
func redisMockExpectGetFromChartCache(mock redismock.ClientMock, key, url string, opts *common.HttpClientOptions) error {
if url != "" {
_, chartID, version, err := fromRedisKeyForChart(key)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
Copyright 2022 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// this is a copy of fluxcd source-controller internal/transport/transport.go
// It implements a pool of TCP connections. Allows for re-use of TCP
// connections when pulling (downloading) helm charts. Per
// official Go documentation ref https://go.dev/src/net/http/transport.go, L#68-69:
// Transports should be reused instead of created as needed.
// Transports are safe for concurrent use by multiple goroutines.
package transport

import (
"crypto/tls"
"fmt"
"net"
"net/http"
"sync"
"time"
)

// TransportPool is a progressive and non-blocking pool
// for http.Transport objects, optimised for Gargabe Collection
// and without a hard limit on number of objects created.
//
// Its main purpose is to enable for transport objects to be
// used across helm chart download requests and helm/pkg/getter
// instances by leveraging the getter.WithTransport(t) construct.
//
// The use of this pool improves the default behaviour of helm getter
// which creates a new connection per request, or per getter instance,
// resulting on unnecessary TCP connections with the target.
//
// http.Transport objects may contain sensitive material and also have
// settings that may impact the security of HTTP operations using
// them (i.e. InsecureSkipVerify). Therefore, ensure that they are
// used in a thread-safe way, and also by reseting TLS specific state
// after each use.
//
// Calling the Release(t) function will reset TLS specific state whilst
// also releasing the transport back to the pool to be reused.
//
// xref: https://github.com/helm/helm/pull/10568
// xref2: https://github.com/fluxcd/source-controller/issues/578
type TransportPool struct {
}

var pool = &sync.Pool{
New: func() interface{} {
return &http.Transport{
DisableCompression: true,
Proxy: http.ProxyFromEnvironment,

// Due to the non blocking nature of this approach,
// at peak usage a higher number of transport objects
// may be created. sync.Pool will ensure they are
// gargage collected when/if needed.
//
// By setting a low value to IdleConnTimeout the connections
// will be closed after that period of inactivity, allowing the
// transport to be garbage collected.
IdleConnTimeout: 60 * time.Second,

// use safe defaults based off http.DefaultTransport
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
},
}

// NewOrIdle tries to return an existing transport that is not currently being used.
// If none is found, creates a new Transport instead.
//
// tlsConfig can optionally set the TLSClientConfig for the transport.
func NewOrIdle(tlsConfig *tls.Config) *http.Transport {
t := pool.Get().(*http.Transport)
t.TLSClientConfig = tlsConfig

return t
}

// Release releases the transport back to the TransportPool after
// sanitising its sensitive fields.
func Release(transport *http.Transport) error {
if transport == nil {
return fmt.Errorf("cannot release nil transport")
}

transport.TLSClientConfig = nil

pool.Put(transport)
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright 2022 The Flux authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package transport

import (
"crypto/tls"
"testing"
)

func Test_TransportReuse(t *testing.T) {
t1 := NewOrIdle(nil)
t2 := NewOrIdle(nil)

if t1 == t2 {
t.Errorf("same transported returned twice")
}

err := Release(t2)
if err != nil {
t.Errorf("error releasing transport t2: %v", err)
}

t3 := NewOrIdle(&tls.Config{
ServerName: "testing",
})
if t3.TLSClientConfig == nil || t3.TLSClientConfig.ServerName != "testing" {
t.Errorf("TLSClientConfig not properly configured")
}

err = Release(t3)
if err != nil {
t.Errorf("error releasing transport t3: %v", err)
}
if t3.TLSClientConfig != nil {
t.Errorf("TLSClientConfig not cleared after release")
}

err = Release(nil)
if err == nil {
t.Errorf("should not allow release nil transport")
} else if err.Error() != "cannot release nil transport" {
t.Errorf("wanted error message: 'cannot release nil transport' got: %q", err.Error())
}
}
Loading