diff --git a/cmd/sync/client/client.go b/cmd/sync/client/client.go new file mode 100644 index 00000000..4bd06a26 --- /dev/null +++ b/cmd/sync/client/client.go @@ -0,0 +1,96 @@ +package main + +import ( + "fmt" + "os" + + "github.com/adobe/cluster-registry/pkg/config" + "github.com/adobe/cluster-registry/pkg/sqs" + client "github.com/adobe/cluster-registry/pkg/sync/client" + awssqs "github.com/aws/aws-sdk-go/service/sqs" + "github.com/davecgh/go-spew/spew" + "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +var ( + logLevel, logFormat string + appConfig *config.AppConfig + namespace string + //clusterName string + //cfgFile string +) + +func InitCLI() *cobra.Command { + + var rootCmd = &cobra.Command{ + Use: "cluster-registry-client-sync", + Short: "Cluster Registry Sync Client is a service that keep in sync the cluster CRD", + Long: "\nCluster Registry Sync Client is a service that creates or updates the cluster CRD based on the messages received from the Cluster Registry Sync manager", + PersistentPreRun: loadAppConfig, + Run: run, + } + + initFlags(rootCmd) + + return rootCmd +} + +func initFlags(rootCmd *cobra.Command) { + + rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", logrus.DebugLevel.String(), "The verbosity level of the logs, can be [panic|fatal|error|warn|info|debug|trace]") + rootCmd.PersistentFlags().StringVar(&logFormat, "log-format", "text", "The output format of the logs, can be [text|json]") + //rootCmd.PersistentFlags().StringVar(&cfgFile, "config-file", "", "The path to the configuration file") + rootCmd.PersistentFlags().StringVar(&namespace, "namespace", "cluster-registry", "The namespace where cluster-registry-sync-client will run.") +} + +func loadAppConfig(cmd *cobra.Command, args []string) { + + client.InitLogger(logLevel, logFormat) + + log.Info("Starting the Cluster Registry Sync Client") + + log.Info("Loading the configuration") + appConfig, err := config.LoadSyncClientConfig() + if err != nil { + log.Error("Cannot load the cluster-registry-sync-client configuration:", err.Error()) + os.Exit(1) + } + log.Info("Config loaded successfully") + log.Info("Cluster (custom resource) to be checked:", appConfig.ClusterName) +} + +func run(cmd *cobra.Command, args []string) { + + log.Info("Cluster Registry Sync Client is running") + + // Consume the messages from the queue using a sync consumer + sqsInstance := sqs.NewSQS(appConfig) + log.Info("Starting the SQS sync consumer") + + handler := func(m *awssqs.Message) error { + spew.Dump(m) + // TODO + return nil + } + syncConsumer := sqs.NewSyncConsumer(sqsInstance, appConfig, handler) + go syncConsumer.Consume() + + // Block the thread + select {} +} + +func main() { + + rootCmd := InitCLI() + + // Execute the CLI application + if err := rootCmd.Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } + + //TODO + +} diff --git a/pkg/sqs/synconsumer.go b/pkg/sqs/synconsumer.go new file mode 100644 index 00000000..9e955be3 --- /dev/null +++ b/pkg/sqs/synconsumer.go @@ -0,0 +1,141 @@ +/* +Copyright 2024 Adobe. All rights reserved. +This file is licensed to you under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. You may obtain a copy +of the License at http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under +the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS +OF ANY KIND, either express or implied. See the License for the specific language +governing permissions and limitations under the License. +*/ + +package sqs + +import ( + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/aws/aws-sdk-go/service/sqs/sqsiface" + "github.com/labstack/gommon/log" + + "github.com/adobe/cluster-registry/pkg/config" + monitoring "github.com/adobe/cluster-registry/pkg/monitoring/apiserver" +) + +// consumer struct +type synconsumer struct { + sqs sqsiface.SQSAPI + queueURL string + workerPool int + maxMessages int64 + pollWaitSeconds int64 + retrySeconds int + messageHandler func(*sqs.Message) error +} + +// NewSyncConsumer - creates a new SQS message queue consumer +// used by the sync consumer service +// TODO: add metrics later +func NewSyncConsumer(sqsSvc sqsiface.SQSAPI, appConfig *config.AppConfig, h func(*sqs.Message) error) Consumer { + + urlResult, err := sqsSvc.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: &appConfig.SqsQueueName, + }) + if err != nil { + log.Fatal(err.Error()) + } + + return &synconsumer{ + sqs: sqsSvc, + queueURL: *urlResult.QueueUrl, + workerPool: 10, + maxMessages: 1, + pollWaitSeconds: 1, + retrySeconds: 5, + messageHandler: h, + } +} + +// Status verifies the status/connectivity of the sqs service +func (c *synconsumer) Status(appConfig *config.AppConfig, m monitoring.MetricsI) error { + _, err := c.sqs.GetQueueUrl(&sqs.GetQueueUrlInput{ + QueueName: &appConfig.SqsQueueName, + }) + + if err != nil { + log.Error(err.Error()) + } + + return err +} + +// Consume - long pooling +func (c *synconsumer) Consume() { + var wg sync.WaitGroup + + for w := 1; w <= c.workerPool; w++ { + wg.Add(1) + go func(w int) { + defer wg.Done() + c.worker(w) + }(w) + } + wg.Wait() +} + +func (c *synconsumer) worker(id int) { + for { + output, err := c.sqs.ReceiveMessage((&sqs.ReceiveMessageInput{ + QueueUrl: &c.queueURL, + AttributeNames: aws.StringSlice([]string{ + "ClusterName", "SentTimestamp", + }), + MaxNumberOfMessages: aws.Int64(c.maxMessages), + WaitTimeSeconds: aws.Int64(c.pollWaitSeconds), + })) + + if err != nil { + log.Error(err.Error()) + log.Info("Retrying in", c.retrySeconds, " seconds") + time.Sleep(time.Duration(c.retrySeconds) * time.Second) + continue + } + + for _, m := range output.Messages { + log.Debug("Messsage ID: ", *m.MessageId) + log.Debug("Message Body: ", *m.Body) + + err := c.processMessage(m) + + if err != nil { + log.Error(err.Error()) + continue + } + err = c.delete(m) + if err != nil { + log.Error(err.Error()) + } + } + } +} + +// processMessage - process the recieved message +func (c *synconsumer) processMessage(m *sqs.Message) error { + + err := c.messageHandler(m) + return err +} + +func (c *synconsumer) delete(m *sqs.Message) error { + + _, err := c.sqs.DeleteMessage( + &sqs.DeleteMessageInput{QueueUrl: &c.queueURL, ReceiptHandle: m.ReceiptHandle}) + + if err != nil { + log.Error(err.Error()) + } + return err +} diff --git a/pkg/sync/client/k8s.go b/pkg/sync/client/k8s.go new file mode 100644 index 00000000..097ffce7 --- /dev/null +++ b/pkg/sync/client/k8s.go @@ -0,0 +1,47 @@ +/* +Copyright 2024 Adobe. All rights reserved. +This file is licensed to you under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. You may obtain a copy +of the License at http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under +the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS +OF ANY KIND, either express or implied. See the License for the specific language +governing permissions and limitations under the License. +*/ + +package client + +import ( + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client/config" +) + +func getClientSet() (*kubernetes.Clientset, error) { + cfg, err := config.GetConfig() + if err != nil { + return nil, err + } + + clientSet, err := kubernetes.NewForConfig(cfg) + if err != nil { + return nil, err + } + + return clientSet, nil +} + +func getDynamicClientSet() (*dynamic.DynamicClient, error) { + cfg, err := config.GetConfig() + if err != nil { + return nil, err + } + + dnc, err := dynamic.NewForConfig(cfg) + if err != nil { + return nil, err + } + + return dnc, nil +} diff --git a/pkg/sync/client/utils.go b/pkg/sync/client/utils.go new file mode 100644 index 00000000..d923b7e3 --- /dev/null +++ b/pkg/sync/client/utils.go @@ -0,0 +1,107 @@ +/* +Copyright 2024 Adobe. All rights reserved. +This file is licensed to you under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. You may obtain a copy +of the License at http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under +the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTATIONS +OF ANY KIND, either express or implied. See the License for the specific language +governing permissions and limitations under the License. +*/ + +package client + +import ( + "encoding/json" + "io" + "log" + "os" + "strings" + + registryv1 "github.com/adobe/cluster-registry/pkg/api/registry/v1" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "sigs.k8s.io/yaml" +) + +func ReadFile(patchFilePath string) ([]byte, error) { + patchFile, err := os.Open(patchFilePath) + if err != nil { + log.Fatalf("Error opening patch YAML file: %v", err) + } + defer patchFile.Close() + + patchData, err := io.ReadAll(patchFile) + if err != nil { + log.Fatalf("Error reading patch YAML file: %v", err) + } + return patchData, nil +} + +func UnmarshalYaml(data []byte, cluster *[]registryv1.Cluster) error { + err := yaml.Unmarshal(data, cluster) + if err != nil { + log.Panicf("Error while trying to unmarshal yaml data: %v", err.Error()) + } + + return err +} + +func UnmarshalJSON(data []byte, cluster *[]registryv1.Cluster) error { + err := json.Unmarshal(data, cluster) + if err != nil { + log.Panicf("Error while trying to unmarshal json data: %v", err.Error()) + } + + return err +} + +func MarshalJson(patch map[string]interface{}) ([]byte, error) { + jsonData, err := json.Marshal(patch) + if err != nil { + log.Panicf("Error while trying to marshal json data: %v", err.Error()) + } + + return jsonData, err +} + +// toUnstructured converts a Cluster struct to an unstructured.Unstructured object +func toUnstructured(obj interface{}) (*unstructured.Unstructured, error) { + data, err := json.Marshal(obj) + if err != nil { + return nil, err + } + u := &unstructured.Unstructured{} + if err := u.UnmarshalJSON(data); err != nil { + return nil, err + } + return u, nil +} + +// TODO; check if there is an utils func - see if no need +// unstructuredToJSON converts an unstructured.Unstructured object to a JSON string +func unstructuredToJSON(obj *unstructured.Unstructured) ([]byte, error) { + return obj.MarshalJSON() +} + +func InitLogger(logLevel string, logFormat string) { + + level, err := logrus.ParseLevel(logLevel) + if err != nil { + level = logrus.DebugLevel + } + logrus.SetLevel(level) + + logFormat = strings.ToLower(logFormat) + if logFormat == "text" { + logrus.SetFormatter(&logrus.TextFormatter{ + FullTimestamp: true, + ForceColors: true, + }) + } else { + logrus.SetFormatter(&logrus.JSONFormatter{ + TimestampFormat: "2006-01-02 15:04:05", + }) + } +}