Skip to content

Commit

Permalink
changed variable nameing from repo/repository to registries
Browse files Browse the repository at this point in the history
  • Loading branch information
Catalin-Stratulat-Ericsson committed Oct 22, 2024
1 parent 1edf83b commit c7b9999
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 39 deletions.
76 changes: 39 additions & 37 deletions func/internal/podevaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ const (
fieldManagerName = "krm-function-runner"
functionContainerName = "function"
defaultManagerNamespace = "porch-system"
defaultRepository = "gcr.io/kpt-fn/"
defaultRegistry = "gcr.io/kpt-fn/"
// perhaps should try and get the name of the dockerconfig secret given by user and match this secret name to that to avoid hard coded value?
customRepoImgPullSecret = "auth-secret"
customRegistryImgPullSecret = "auth-secret"

channelBufferSize = 128
)
Expand All @@ -73,7 +73,7 @@ type podEvaluator struct {

var _ Evaluator = &podEvaluator{}

func NewPodEvaluator(namespace, wrapperServerImage string, interval, ttl time.Duration, podTTLConfig string, functionPodTemplateName string, customRepoAuth string) (Evaluator, error) {
func NewPodEvaluator(namespace, wrapperServerImage string, interval, ttl time.Duration, podTTLConfig string, functionPodTemplateName string, registryAuthSecretPath string) (Evaluator, error) {
restCfg, err := config.GetConfig()
if err != nil {
return nil, fmt.Errorf("failed to get rest config: %w", err)
Expand Down Expand Up @@ -102,13 +102,13 @@ func NewPodEvaluator(namespace, wrapperServerImage string, interval, ttl time.Du
pe := &podEvaluator{
requestCh: reqCh,
podCacheManager: &podCacheManager{
gcScanInternal: interval,
podTTL: ttl,
customRepoAuth: customRepoAuth,
requestCh: reqCh,
podReadyCh: readyCh,
cache: map[string]*podAndGRPCClient{},
waitlists: map[string][]chan<- *clientConnAndError{},
gcScanInternal: interval,
podTTL: ttl,
registryAuthSecretPath: registryAuthSecretPath,
requestCh: reqCh,
podReadyCh: readyCh,
cache: map[string]*podAndGRPCClient{},
waitlists: map[string][]chan<- *clientConnAndError{},

podManager: &podManager{
kubeClient: cl,
Expand Down Expand Up @@ -173,7 +173,7 @@ type podCacheManager struct {
gcScanInternal time.Duration
podTTL time.Duration

customRepoAuth string
registryAuthSecretPath string

// requestCh is a receive-only channel to receive
requestCh <-chan *clientConnRequest
Expand Down Expand Up @@ -243,7 +243,7 @@ func (pcm *podCacheManager) warmupCache(podTTLConfig string) error {

// We invoke the function with useGenerateName=false so that the pod name is fixed,
// since we want to ensure only one pod is created for each function.
pcm.podManager.getFuncEvalPodClient(ctx, fnImage, ttl, false, pcm.customRepoAuth)
pcm.podManager.getFuncEvalPodClient(ctx, fnImage, ttl, false, pcm.registryAuthSecretPath)
klog.Infof("preloaded pod cache for function %v", fnImage)
})

Expand Down Expand Up @@ -311,7 +311,7 @@ func (pcm *podCacheManager) podCacheManager() {
pcm.waitlists[req.image] = append(list, req.grpcClientCh)
// We invoke the function with useGenerateName=true to avoid potential name collision, since if pod foo is
// being deleted and we can't use the same name.
go pcm.podManager.getFuncEvalPodClient(context.Background(), req.image, pcm.podTTL, true, pcm.customRepoAuth)
go pcm.podManager.getFuncEvalPodClient(context.Background(), req.image, pcm.podTTL, true, pcm.registryAuthSecretPath)
case resp := <-pcm.podReadyCh:
if resp.err != nil {
klog.Warningf("received error from the pod manager: %v", resp.err)
Expand Down Expand Up @@ -443,9 +443,9 @@ type digestAndEntrypoint struct {
// time-to-live period for the pod. If useGenerateName is false, it will try to
// create a pod with a fixed name. Otherwise, it will create a pod and let the
// apiserver to generate the name from a template.
func (pm *podManager) getFuncEvalPodClient(ctx context.Context, image string, ttl time.Duration, useGenerateName bool, customRepoAuth string) {
func (pm *podManager) getFuncEvalPodClient(ctx context.Context, image string, ttl time.Duration, useGenerateName bool, registryAuthSecretPath string) {
c, err := func() (*podAndGRPCClient, error) {
podKey, err := pm.retrieveOrCreatePod(ctx, image, ttl, useGenerateName, customRepoAuth)
podKey, err := pm.retrieveOrCreatePod(ctx, image, ttl, useGenerateName, registryAuthSecretPath)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -473,7 +473,7 @@ func (pm *podManager) getFuncEvalPodClient(ctx context.Context, image string, tt
}
}

func (pm *podManager) InspectOrCreateSecret(ctx context.Context, customRepoAuth string) error {
func (pm *podManager) InspectOrCreateSecret(ctx context.Context, registryAuthSecretPath string) error {
secret := &corev1.Secret{}
// using pod manager client since this secret is only related to these pods and nothing else
err := pm.kubeClient.Get(context.Background(), client.ObjectKey{
Expand All @@ -485,8 +485,8 @@ func (pm *podManager) InspectOrCreateSecret(ctx context.Context, customRepoAuth
// Error other than "not found" occurred
return err
}
klog.Infof("Secret for private repo pods does not exist and is required.\nGenerating Secret Now")
dockerConfigBytes, err := os.ReadFile(customRepoAuth)
klog.Infof("Secret for private registry pods does not exist and is required.\nGenerating Secret Now")
dockerConfigBytes, err := os.ReadFile(registryAuthSecretPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -519,9 +519,11 @@ type DockerConfig struct {
}

// imageDigestAndEntrypoint gets the entrypoint of a container image by looking at its metadata.
func (pm *podManager) imageDigestAndEntrypoint(ctx context.Context, image string, customRepoAuth string) (*digestAndEntrypoint, error) {
func (pm *podManager) imageDigestAndEntrypoint(ctx context.Context, image string, registryAuthSecretPath string) (*digestAndEntrypoint, error) {
start := time.Now()
defer klog.Infof("getting image metadata for %v took %v", image, time.Since(start))
defer func() {
klog.Infof("getting image metadata for %v took %v", image, time.Since(start))
}()

ref, err := name.ParseReference(image)
if err != nil {
Expand All @@ -530,12 +532,12 @@ func (pm *podManager) imageDigestAndEntrypoint(ctx context.Context, image string
}

var auth authn.Authenticator
if customRepoAuth != "" && !strings.HasPrefix(image, defaultRepository) {
if err := pm.handleCustomAuth(ctx, customRepoAuth); err != nil {
if registryAuthSecretPath != "" && !strings.HasPrefix(image, defaultRegistry) {
if err := pm.handleCustomAuth(ctx, registryAuthSecretPath); err != nil {
return nil, err
}

auth, err = pm.getCustomAuth(ref, customRepoAuth)
auth, err = pm.getCustomAuth(ref, registryAuthSecretPath)
if err != nil {
return nil, err
}
Expand All @@ -550,26 +552,26 @@ func (pm *podManager) imageDigestAndEntrypoint(ctx context.Context, image string
return pm.getImageMetadata(ctx, ref, auth, image)
}

// handleCustomAuth ensures if images from custom repo's are requested their appropriate credentials are passed onto a secret for fn pods to use when pulling if it doesnt already exist
func (pm *podManager) handleCustomAuth(ctx context.Context, customRepoAuth string) error {
if err := pm.InspectOrCreateSecret(ctx, customRepoAuth); err != nil {
// handleCustomAuth ensures if images from custom registry's are requested their appropriate credentials are passed onto a secret for fn pods to use when pulling if it doesnt already exist
func (pm *podManager) handleCustomAuth(ctx context.Context, registryAuthSecretPath string) error {
if err := pm.InspectOrCreateSecret(ctx, registryAuthSecretPath); err != nil {
return err
}
return nil
}

// if a custom image is requested use the secret provided to authenticate
func (pm *podManager) appendImagePullSecret(image string, customRepoAuth string, podTemplate *corev1.Pod) {
if customRepoAuth != "" && !strings.HasPrefix(image, defaultRepository) {
func (pm *podManager) appendImagePullSecret(image string, registryAuthSecretPath string, podTemplate *corev1.Pod) {
if registryAuthSecretPath != "" && !strings.HasPrefix(image, defaultRegistry) {
podTemplate.Spec.ImagePullSecrets = []corev1.LocalObjectReference{
{Name: "auth-secret"},
}
}
}

// getCustomAuth reads and parses the custom repo auth file from the mounted secret.
func (pm *podManager) getCustomAuth(ref name.Reference, customRepoAuth string) (authn.Authenticator, error) {
dockerConfigBytes, err := os.ReadFile(customRepoAuth)
// getCustomAuth reads and parses the custom registry auth file from the mounted secret.
func (pm *podManager) getCustomAuth(ref name.Reference, registryAuthSecretPath string) (authn.Authenticator, error) {
dockerConfigBytes, err := os.ReadFile(registryAuthSecretPath)
if err != nil {
klog.Errorf("error reading authentication file %v", err)
return nil, err
Expand Down Expand Up @@ -613,14 +615,14 @@ func (pm *podManager) getImageMetadata(ctx context.Context, ref name.Reference,
}

// retrieveOrCreatePod retrieves or creates a pod for an image.
func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string, ttl time.Duration, useGenerateName bool, customRepoAuth string) (client.ObjectKey, error) {
func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string, ttl time.Duration, useGenerateName bool, registryAuthSecretPath string) (client.ObjectKey, error) {
var de *digestAndEntrypoint
var replacePod bool
var currentPod *corev1.Pod
var err error
val, found := pm.imageMetadataCache.Load(image)
if !found {
de, err = pm.imageDigestAndEntrypoint(ctx, image, customRepoAuth)
de, err = pm.imageDigestAndEntrypoint(ctx, image, registryAuthSecretPath)
if err != nil {
return client.ObjectKey{}, fmt.Errorf("unable to get the entrypoint for %v: %w", image, err)
}
Expand All @@ -639,7 +641,7 @@ func (pm *podManager) retrieveOrCreatePod(ctx context.Context, image string, ttl
// TODO: It's possible to set up a Watch in the fn runner namespace, and always try to maintain a up-to-date local cache.
podList := &corev1.PodList{}
podTemplate, templateVersion, err := pm.getBasePodTemplate(ctx)
pm.appendImagePullSecret(image, customRepoAuth, podTemplate)
pm.appendImagePullSecret(image, registryAuthSecretPath, podTemplate)
if err != nil {
klog.Errorf("failed to generate a base pod template: %v", err)
return client.ObjectKey{}, fmt.Errorf("failed to generate a base pod template: %w", err)
Expand Down Expand Up @@ -873,9 +875,9 @@ func podID(image, hash string) (string, error) {
return "", fmt.Errorf("unable to parse image reference %v: %w", image, err)
}

// repoName will be something like gcr.io/kpt-fn/set-namespace
repoName := ref.Context().Name()
parts := strings.Split(repoName, "/")
// registryName will be something like gcr.io/kpt-fn/set-namespace
registryName := ref.Context().Name()
parts := strings.Split(registryName, "/")
name := strings.ReplaceAll(parts[len(parts)-1], "_", "-")
return fmt.Sprintf("%v-%v", name, hash[:8]), nil
}
Expand Down
4 changes: 2 additions & 2 deletions func/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var (
port = flag.Int("port", 9445, "The server port")
functions = flag.String("functions", "./functions", "Path to cached functions.")
config = flag.String("config", "./config.yaml", "Path to the config file.")
customRepoAuth = flag.String("custom-repo-secret-path", "", "Path to means of authentication for using images from custom repositories e.g. docker config file")
registryAuthSecretPath = flag.String("registry-auth-secret-path", "", "Path to means of authentication for using images from custom registries e.g. docker config file")
podCacheConfig = flag.String("pod-cache-config", "/pod-cache-config/pod-cache-config.yaml", "Path to the pod cache config file. The file is map of function name to TTL.")
podNamespace = flag.String("pod-namespace", "porch-fn-system", "Namespace to run KRM functions pods.")
podTTL = flag.Duration("pod-ttl", 30*time.Minute, "TTL for pods before GC.")
Expand Down Expand Up @@ -90,7 +90,7 @@ func run() error {
if wrapperServerImage == "" {
return fmt.Errorf("environment variable %v must be set to use pod function evaluator runtime", wrapperServerImageEnv)
}
podEval, err := internal.NewPodEvaluator(*podNamespace, wrapperServerImage, *scanInterval, *podTTL, *podCacheConfig, *functionPodTemplateName, *customRepoAuth)
podEval, err := internal.NewPodEvaluator(*podNamespace, wrapperServerImage, *scanInterval, *podTTL, *podCacheConfig, *functionPodTemplateName, *registryAuthSecretPath)
if err != nil {
return fmt.Errorf("failed to initialize pod evaluator: %w", err)
}
Expand Down

0 comments on commit c7b9999

Please sign in to comment.