diff --git a/e2e/k8s.go b/e2e/k8s.go index b1a874ba..3efdecb3 100644 --- a/e2e/k8s.go +++ b/e2e/k8s.go @@ -30,9 +30,6 @@ func getClient() ( return cl, cfg, nil } -func objKey(ns, name string) client.ObjectKey { - return client.ObjectKey{Namespace: ns, Name: name} -} func deleteNS(ns string) error { return sh.RunV("kubectl", "delete", "namespace", ns) } @@ -63,7 +60,7 @@ func getScaledObject( if err != nil { return nil, err } - if err := cl.Get(ctx, objKey(ns, name), scaledObject); err != nil { + if err := cl.Get(ctx, k8s.ObjKey(ns, name), scaledObject); err != nil { return nil, err } return scaledObject, nil diff --git a/e2e/wait_deployment.go b/e2e/wait_deployment.go index 69d7dbda..7d268b9a 100644 --- a/e2e/wait_deployment.go +++ b/e2e/wait_deployment.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/kedacore/http-add-on/pkg/k8s" "golang.org/x/sync/errgroup" appsv1 "k8s.io/api/apps/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -21,7 +22,7 @@ func waitUntilDeployment( depl := &appsv1.Deployment{} if err := cl.Get( ctx, - objKey(ns, name), + k8s.ObjKey(ns, name), depl, ); err != nil { return err diff --git a/go.mod b/go.mod index d70d05c5..5004b4f2 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( go.uber.org/zap v1.17.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c google.golang.org/grpc v1.33.2 - google.golang.org/protobuf v1.27.1 + google.golang.org/protobuf v1.26.0 k8s.io/api v0.21.3 k8s.io/apimachinery v0.21.3 k8s.io/client-go v0.21.3 diff --git a/go.sum b/go.sum index 4a31496e..fe2b52c4 100644 --- a/go.sum +++ b/go.sum @@ -673,9 +673,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= -google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/interceptor/config/operator.go b/interceptor/config/operator.go deleted file mode 100644 index 19a38a6b..00000000 --- a/interceptor/config/operator.go +++ /dev/null @@ -1,41 +0,0 @@ -package config - -import ( - "fmt" - "net/url" - "time" - - "github.com/kelseyhightower/envconfig" -) - -// Operator is the configuration for where and how the interceptor -// makes RPC calls to the operator -type Operator struct { - OperatorServiceName string `envconfig:"KEDA_HTTP_OPERATOR_SERVICE_NAME" required:"true"` - OperatorServicePort string `envconfig:"KEDA_HTTP_OPERATOR_SERVICE_PORT" required:"true"` - RoutingTableUpdateDurationMS int `envconfig:"KEDA_HTTP_OPERATOR_ROUTING_TABLE_UPDATE_DURATION_MS" default:"500"` -} - -func (o *Operator) RoutingTableUpdateDuration() time.Duration { - return time.Duration(o.RoutingTableUpdateDurationMS) * time.Millisecond -} - -// ServiceURL formats the app service name and port into a URL -func (o *Operator) RoutingFetchURL() (*url.URL, error) { - urlStr := fmt.Sprintf( - "http://%s:%s", - o.OperatorServiceName, - o.OperatorServicePort, - ) - u, err := url.Parse(urlStr) - if err != nil { - return nil, err - } - return u, nil -} - -func MustParseOperator() *Operator { - ret := new(Operator) - envconfig.MustProcess("", ret) - return ret -} diff --git a/interceptor/config/serving.go b/interceptor/config/serving.go index 10923c95..4a596c99 100644 --- a/interceptor/config/serving.go +++ b/interceptor/config/serving.go @@ -16,6 +16,15 @@ type Serving struct { // This is the server that the external scaler will issue metrics // requests to AdminPort int `envconfig:"KEDA_HTTP_ADMIN_PORT" required:"true"` + // RoutingTableUpdateDurationMS is the interval (in milliseconds) representing how + // often to do a complete update of the routing table ConfigMap. + // + // The interceptor will also open a watch stream to the routing table + // ConfigMap and attempt to update the routing table on every update. + // + // Since it does full updates alongside watch stream updates, it can + // only process one at a time. Therefore, this is a best effort timeout + RoutingTableUpdateDurationMS int `envconfig:"KEDA_HTTP_OPERATOR_ROUTING_TABLE_UPDATE_DURATION_MS" default:"500"` } // Parse parses standard configs using envconfig and returns a pointer to the diff --git a/interceptor/main.go b/interceptor/main.go index 8336c6f3..7bfa5e18 100644 --- a/interceptor/main.go +++ b/interceptor/main.go @@ -4,9 +4,7 @@ import ( "context" "fmt" "math/rand" - "net/http" nethttp "net/http" - "net/url" "os" "time" @@ -33,7 +31,6 @@ func main() { os.Exit(1) } timeoutCfg := config.MustParseTimeouts() - operatorCfg := config.MustParseOperator() servingCfg := config.MustParseServing() ctx := context.Background() @@ -61,29 +58,23 @@ func main() { lggr.Error(err, "creating new deployment cache") os.Exit(1) } - waitFunc := newDeployReplicasForwardWaitFunc(deployCache) - operatorRoutingFetchURL, err := operatorCfg.RoutingFetchURL() - if err != nil { - lggr.Error(err, "getting the operator URL") - os.Exit(1) - } + configMapsInterface := cl.CoreV1().ConfigMaps(servingCfg.CurrentNamespace) + + waitFunc := newDeployReplicasForwardWaitFunc(deployCache) lggr.Info("Interceptor starting") - lggr.Info( - "Fetching initial routing table", - "operatorRoutingURL", - operatorRoutingFetchURL.String(), - ) q := queue.NewMemory() routingTable := routing.NewTable() + lggr.Info( + "Fetching initial routing table", + ) if err := routing.GetTable( ctx, - nethttp.DefaultClient, lggr, - *operatorRoutingFetchURL, + configMapsInterface, routingTable, q, ); err != nil { @@ -92,37 +83,44 @@ func main() { } errGrp, ctx := errgroup.WithContext(ctx) + + // start the update loop that updates the routing table from + // the ConfigMap that the operator updates as HTTPScaledObjects + // enter and exit the system errGrp.Go(func() error { - return runAdminServer( + return routing.StartConfigMapRoutingTableUpdater( + ctx, lggr, - operatorRoutingFetchURL, - q, + time.Duration(servingCfg.RoutingTableUpdateDurationMS)*time.Millisecond, + configMapsInterface, routingTable, - adminPort, + q, ) }) + + // start the administrative server. this is the server + // that serves the queue size API errGrp.Go(func() error { lggr.Info( - "routing table update loop starting", - "operatorRoutingURL", - operatorRoutingFetchURL.String(), + "starting the admin server", + "port", + adminPort, ) - - return routing.StartUpdateLoop( - ctx, + return runAdminServer( lggr, - operatorCfg.RoutingTableUpdateDuration(), - routing.NewGetTableUpdateLoopFunc( - lggr, - http.DefaultClient, - *operatorRoutingFetchURL, - routingTable, - q, - ), + configMapsInterface, + q, + routingTable, + adminPort, ) }) errGrp.Go(func() error { + lggr.Info( + "starting the proxy server", + "port", + proxyPort, + ) return runProxyServer( lggr, q, @@ -142,7 +140,7 @@ func main() { func runAdminServer( lggr logr.Logger, - routingFetchURL *url.URL, + cmGetter k8s.ConfigMapGetter, q queue.Counter, routingTable *routing.Table, port int, @@ -162,8 +160,7 @@ func runAdminServer( routing.AddPingRoute( lggr, adminServer, - http.DefaultClient, - routingFetchURL, + cmGetter, routingTable, q, ) diff --git a/operator/controllers/routing_table.go b/operator/controllers/routing_table.go index d923d0da..7f71c56c 100644 --- a/operator/controllers/routing_table.go +++ b/operator/controllers/routing_table.go @@ -6,6 +6,8 @@ import ( "github.com/go-logr/logr" "github.com/kedacore/http-add-on/pkg/k8s" "github.com/kedacore/http-add-on/pkg/routing" + pkgerrs "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -60,37 +62,58 @@ func updateRoutingMap( namespace string, table *routing.Table, ) error { - tableAsJSON, marshalErr := table.MarshalJSON() - if marshalErr != nil { - return marshalErr - } - - routingConfigMap, err := k8s.GetConfigMap(ctx, cl, namespace, routingTableName) - if err != nil { - return err + lggr = lggr.WithName("updateRoutingMap") + routingConfigMap, err := k8s.GetConfigMap(ctx, cl, namespace, routing.ConfigMapRoutingTableName) + // if there is an error other than not found on the ConfigMap, we should + // fail + if err != nil && !errors.IsNotFound(err) { + lggr.Error( + err, + "other issue fetching the routing table ConfigMap", + "configMapName", + routing.ConfigMapRoutingTableName, + ) + return pkgerrs.Wrap(err, "routing table ConfigMap fetch error") } - // if the routing table doesn't exist, we need to create it with the latest data - if routingConfigMap == nil { - routingTableData := map[string]string{ - "routing-table": string(tableAsJSON), - } - + // if either the routing table ConfigMap doesn't exist or for some reason it's + // nil in memory, we need to create it + if errors.IsNotFound(err) || routingConfigMap == nil { + lggr.Info( + "routing table ConfigMap didn't exist, creating it", + "configMapName", + routing.ConfigMapRoutingTableName, + ) routingTableLabels := map[string]string{ "control-plane": "operator", "keda.sh/addon": "http-add-on", - "app": "http-add-on", - "name": "http-add-on-routing-table", + "app": "http-add-on", + "name": "http-add-on-routing-table", } - - if err := k8s.CreateConfigMap(ctx, lggr, cl, k8s.NewConfigMap(namespace, routingTableName, routingTableLabels, routingTableData)); err != nil { + cm := k8s.NewConfigMap( + namespace, + routing.ConfigMapRoutingTableName, + routingTableLabels, + map[string]string{}, + ) + if err := routing.SaveTableToConfigMap(table, cm); err != nil { + return err + } + if err := k8s.CreateConfigMap( + ctx, + lggr, + cl, + cm, + ); err != nil { return err } } else { - newRoutingTable := routingConfigMap.DeepCopy() - newRoutingTable.Data["routing-table"] = string(tableAsJSON) - if _, patchErr := k8s.PatchConfigMap(ctx, lggr, cl, routingConfigMap, newRoutingTable); patchErr != nil { - return patchErr + newCM := routingConfigMap.DeepCopy() + if err := routing.SaveTableToConfigMap(table, newCM); err != nil { + return err + } + if _, patchErr := k8s.PatchConfigMap(ctx, lggr, cl, routingConfigMap, newCM); patchErr != nil { + return patchErr } } diff --git a/pkg/k8s/client.go b/pkg/k8s/client.go index ebc1c3bd..13c066ad 100644 --- a/pkg/k8s/client.go +++ b/pkg/k8s/client.go @@ -5,6 +5,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" ) // NewClientset gets a new Kubernetes clientset, or calls log.Fatal @@ -27,3 +28,9 @@ func NewClientset() (*kubernetes.Clientset, dynamic.Interface, error) { } return clientset, dynamic, nil } + +// ObjKey creates a new client.ObjectKey with the given +// name and namespace +func ObjKey(ns, name string) client.ObjectKey { + return client.ObjectKey{Namespace: ns, Name: name} +} diff --git a/pkg/k8s/configMap.go b/pkg/k8s/configMap.go deleted file mode 100644 index 7afbd1c6..00000000 --- a/pkg/k8s/configMap.go +++ /dev/null @@ -1,91 +0,0 @@ -package k8s - -import ( - "context" - "fmt" - - "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -// newConfigMap creates a new configMap structure -func NewConfigMap( - namespace string, - name string, - labels map[string]string, - data map[string]string, -) *corev1.ConfigMap { - - configMap := &corev1.ConfigMap{ - TypeMeta: metav1.TypeMeta{ - Kind: "ConfigMap", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, - Labels: labels, - }, - Data: data, - } - - return configMap -} - -func CreateConfigMap ( - ctx context.Context, - logger logr.Logger, - cl client.Client, - configMap *corev1.ConfigMap, -) error { - existentConfigMap, err := GetConfigMap(ctx, cl, configMap.Namespace, configMap.Name) - if err == nil { - return err - } - if existentConfigMap.Name != "" { - return fmt.Errorf("ConfigMap %s already exists", configMap.Name) - } - - createErr := cl.Create(ctx, configMap) - if createErr != nil { - return createErr - } - return nil -} - -func DeleteConfigMap ( - ctx context.Context, - cl client.Client, - configMap *corev1.ConfigMap, - logger logr.Logger, -) error { - return cl.Delete(ctx, configMap) -} - -func PatchConfigMap ( - ctx context.Context, - logger logr.Logger, - cl client.Client, - originalConfigMap *corev1.ConfigMap, - patchConfigMap *corev1.ConfigMap, -) (*corev1.ConfigMap,error) { - patchErr := cl.Patch(ctx, patchConfigMap, client.MergeFrom(originalConfigMap)) - if patchErr != nil { return nil, patchErr } - return patchConfigMap, nil -} - -func GetConfigMap ( - ctx context.Context, - cl client.Client, - namespace string, - name string, -) (*corev1.ConfigMap, error) { - configMap := &corev1.ConfigMap{} - err := cl.Get(ctx, client.ObjectKey{ Name: name, Namespace: namespace }, configMap) - if err != nil { - return nil, err - } - return configMap, nil -} - diff --git a/pkg/k8s/config_map.go b/pkg/k8s/config_map.go new file mode 100644 index 00000000..b3c1b5e0 --- /dev/null +++ b/pkg/k8s/config_map.go @@ -0,0 +1,145 @@ +package k8s + +import ( + "context" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// ConfigMapGetter is a pared down version of a ConfigMapInterface +// (found here: https://pkg.go.dev/k8s.io/client-go@v0.21.3/kubernetes/typed/core/v1#ConfigMapInterface). +// +// Pass this whenever possible to functions that only need to get individual ConfigMaps +// from Kubernetes, and nothing else. +type ConfigMapGetter interface { + Get(ctx context.Context, name string, opts metav1.GetOptions) (*corev1.ConfigMap, error) +} + +// ConfigMapWatcher is a pared down version of a ConfigMapInterface +// (found here: https://pkg.go.dev/k8s.io/client-go@v0.21.3/kubernetes/typed/core/v1#ConfigMapInterface). +// +// Pass this whenever possible to functions that only need to watch for ConfigMaps +// from Kubernetes, and nothing else. +type ConfigMapWatcher interface { + Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) +} + +// ConfigMapGetterWatcher is a pared down version of a ConfigMapInterface +// (found here: https://pkg.go.dev/k8s.io/client-go@v0.21.3/kubernetes/typed/core/v1#ConfigMapInterface). +// +// Pass this whenever possible to functions that only need to watch for ConfigMaps +// from Kubernetes, and nothing else. +type ConfigMapGetterWatcher interface { + ConfigMapGetter + ConfigMapWatcher +} + +// newConfigMap creates a new configMap structure +func NewConfigMap( + namespace string, + name string, + labels map[string]string, + data map[string]string, +) *corev1.ConfigMap { + + configMap := &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: labels, + }, + Data: data, + } + + return configMap +} + +// CreateConfigMap sends a request to Kubernetes using the client cl +// to create configMap. Returns a non-nil error if anything failed with the creation, +// including if the config map already existed. +func CreateConfigMap( + ctx context.Context, + logger logr.Logger, + cl client.Client, + configMap *corev1.ConfigMap, +) error { + logger = logger.WithName("pkg.k8s.CreateConfigMap") + if err := cl.Create(ctx, configMap); err != nil { + logger.Error( + err, + "failed to create ConfigMap", + "configMap", + *configMap, + ) + return err + } + return nil +} + +func DeleteConfigMap( + ctx context.Context, + cl client.Client, + configMap *corev1.ConfigMap, + logger logr.Logger, +) error { + logger = logger.WithName("pkg.k8s.DeleteConfigMap") + err := cl.Delete(ctx, configMap) + if err != nil { + logger.Error( + err, + "failed to delete configmap", + "configMap", + *configMap, + ) + return err + } + return nil +} + +func PatchConfigMap( + ctx context.Context, + logger logr.Logger, + cl client.Client, + originalConfigMap *corev1.ConfigMap, + patchConfigMap *corev1.ConfigMap, +) (*corev1.ConfigMap, error) { + logger = logger.WithName("pkg.k8s.PatchConfigMap") + if err := cl.Patch( + ctx, + patchConfigMap, + client.MergeFrom(originalConfigMap), + ); err != nil { + logger.Error( + err, + "failed to patch ConfigMap", + "originalConfigMap", + *originalConfigMap, + "patchConfigMap", + *patchConfigMap, + ) + return nil, err + } + return patchConfigMap, nil +} + +func GetConfigMap( + ctx context.Context, + cl client.Client, + namespace string, + name string, +) (*corev1.ConfigMap, error) { + + configMap := &corev1.ConfigMap{} + err := cl.Get(ctx, client.ObjectKey{Name: name, Namespace: namespace}, configMap) + if err != nil { + return nil, err + } + return configMap, nil +} diff --git a/pkg/queue/queue_rpc.go b/pkg/queue/queue_rpc.go index 0d5f511c..318692c6 100644 --- a/pkg/queue/queue_rpc.go +++ b/pkg/queue/queue_rpc.go @@ -28,6 +28,7 @@ func newSizeHandler( q CountReader, ) nethttp.Handler { return http.HandlerFunc(func(w nethttp.ResponseWriter, r *nethttp.Request) { + cur, err := q.Current() if err != nil { lggr.Error(err, "getting queue size") diff --git a/pkg/routing/config_map.go b/pkg/routing/config_map.go new file mode 100644 index 00000000..c27c60c4 --- /dev/null +++ b/pkg/routing/config_map.go @@ -0,0 +1,156 @@ +package routing + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + "github.com/kedacore/http-add-on/pkg/k8s" + "github.com/kedacore/http-add-on/pkg/queue" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + // the name of the ConfigMap that stores the routing table + ConfigMapRoutingTableName = "keda-http-routing-table" + // the key in the ConfigMap data that stores the JSON routing table + configMapRoutingTableKey = "routing-table" +) + +// SaveTableToConfigMap saves the contents of table to the Data field in +// configMap +func SaveTableToConfigMap(table *Table, configMap *corev1.ConfigMap) error { + tableAsJSON, err := table.MarshalJSON() + if err != nil { + return err + } + configMap.Data[configMapRoutingTableKey] = string(tableAsJSON) + return nil +} + +// FetchTableFromConfigMap fetches the Data field from configMap, converts it +// to a routing table, and returns it +func FetchTableFromConfigMap(configMap *corev1.ConfigMap, q queue.Counter) (*Table, error) { + data, found := configMap.Data[configMapRoutingTableKey] + if !found { + return nil, fmt.Errorf( + "no '%s' key found in the %s ConfigMap", + configMapRoutingTableKey, + ConfigMapRoutingTableName, + ) + } + ret := NewTable() + if err := ret.UnmarshalJSON([]byte(data)); err != nil { + retErr := errors.Wrap( + err, + fmt.Sprintf( + "error decoding '%s' key in %s ConfigMap", + configMapRoutingTableKey, + ConfigMapRoutingTableName, + ), + ) + return nil, retErr + } + return ret, nil +} + +// updateQueueFromTable ensures that every host in the routing table +// exists in the given queue, and no hosts exist in the queue that +// don't exist in the routing table. It uses q.Ensure() and q.Remove() +// to do those things, respectively. +func updateQueueFromTable( + lggr logr.Logger, + table *Table, + q queue.Counter, +) error { + // ensure that every host is in the queue, even if it has + // zero pending requests. This is important so that the + // scaler can report on all applications. + for host := range table.m { + q.Ensure(host) + } + + // ensure that the queue doesn't have any extra hosts that don't exist in the table + qCur, err := q.Current() + if err != nil { + lggr.Error( + err, + "failed to get current queue counts (in order to prune it of missing routing table hosts)", + ) + return errors.Wrap(err, "pkg.routing.updateQueueFromTable") + } + for host := range qCur.Counts { + if _, err := table.Lookup(host); err != nil { + q.Remove(host) + } + } + return nil +} + +// GetTable fetches the contents of the appropriate ConfigMap that stores +// the routing table, then tries to decode it into a temporary routing table +// data structure. +// +// If that succeeds, it calls table.Replace(newTable), then ensures that +// every host in the routing table exists in the given queue, and no hosts +// exist in the queue that don't exist in the routing table. It uses q.Ensure() +// and q.Remove() to do those things, respectively. +func GetTable( + ctx context.Context, + lggr logr.Logger, + getter k8s.ConfigMapGetter, + table *Table, + q queue.Counter, +) error { + lggr = lggr.WithName("pkg.routing.GetTable") + + cm, err := getter.Get( + ctx, + ConfigMapRoutingTableName, + metav1.GetOptions{}, + ) + if err != nil { + lggr.Error( + err, + "failed to fetch routing table config map", + "configMapName", + ConfigMapRoutingTableName, + ) + return errors.Wrap( + err, + fmt.Sprintf( + "failed to fetch ConfigMap %s", + ConfigMapRoutingTableName, + ), + ) + } + newTable, err := FetchTableFromConfigMap(cm, q) + if err != nil { + lggr.Error( + err, + "failed decoding routing table ConfigMap", + "configMapName", + ConfigMapRoutingTableName, + ) + return errors.Wrap( + err, + fmt.Sprintf( + "failed decoding ConfigMap %s into a routing table", + ConfigMapRoutingTableName, + ), + ) + } + + table.Replace(newTable) + if err := updateQueueFromTable(lggr, table, q); err != nil { + lggr.Error( + err, + "unable to update the queue from the new routing table", + ) + return errors.Wrap(err, "pkg.routing.GetTable") + } + + return nil +} diff --git a/pkg/routing/config_map_updater.go b/pkg/routing/config_map_updater.go new file mode 100644 index 00000000..67a58f33 --- /dev/null +++ b/pkg/routing/config_map_updater.go @@ -0,0 +1,88 @@ +package routing + +import ( + "context" + "time" + + "github.com/go-logr/logr" + "github.com/kedacore/http-add-on/pkg/k8s" + "github.com/kedacore/http-add-on/pkg/queue" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" +) + +// StartConfigMapRoutingTableUpdater starts a loop that does the following: +// +// - Fetches a full version of the ConfigMap called ConfigMapRoutingTableName in +// the given namespace ns, and calls table.Replace(newTable) after it does so +// - Uses watcher to watch for all ADDED or CREATED events on the ConfigMap +// called ConfigMapRoutingTableName. On either of those events, decodes +// that ConfigMap into a routing table and stores the new table into table +// using table.Replace(newTable) +// - Returns an appropriate non-nil error if ctx.Done() receives +func StartConfigMapRoutingTableUpdater( + ctx context.Context, + lggr logr.Logger, + updateEvery time.Duration, + getterWatcher k8s.ConfigMapGetterWatcher, + table *Table, + q queue.Counter, +) error { + lggr = lggr.WithName("pkg.routing.StartConfigMapRoutingTableUpdater") + watchIface, err := getterWatcher.Watch(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + defer watchIface.Stop() + + ticker := time.NewTicker(updateEvery) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return errors.Wrap(ctx.Err(), "context is done") + case <-ticker.C: + if err := GetTable(ctx, lggr, getterWatcher, table, q); err != nil { + return errors.Wrap(err, "failed to fetch routing table") + } + + case evt := <-watchIface.ResultChan(): + evtType := evt.Type + obj := evt.Object + if evtType == watch.Added || evtType == watch.Modified { + cm, ok := obj.(*corev1.ConfigMap) + // by definition of watchIface, all returned objects should + // be assertable to a ConfigMap. In the likely impossible + // case that it isn't, just ignore and move on. + // This check is just to be defensive. + if !ok { + continue + } + // the watcher is open on all ConfigMaps in the namespace, so + // bail out of this loop iteration immediately if the event + // isn't for the routing table ConfigMap. + if cm.Name != ConfigMapRoutingTableName { + continue + } + newTable, err := FetchTableFromConfigMap(cm, q) + if err != nil { + return err + } + table.Replace(newTable) + if err := updateQueueFromTable(lggr, table, q); err != nil { + // if we couldn't update the queue, just log but don't bail. + // we want to give the loop a chance to tick (or receive a new event) + // and update the table & queue again + lggr.Error( + err, + "failed to update queue from table on ConfigMap change event", + ) + continue + } + } + } + } + +} diff --git a/pkg/routing/table_rpc.go b/pkg/routing/table_rpc.go index 989e8525..8fcd1418 100644 --- a/pkg/routing/table_rpc.go +++ b/pkg/routing/table_rpc.go @@ -1,12 +1,11 @@ package routing import ( - "context" "encoding/json" "net/http" - "net/url" "github.com/go-logr/logr" + "github.com/kedacore/http-add-on/pkg/k8s" "github.com/kedacore/http-add-on/pkg/queue" ) @@ -28,13 +27,13 @@ func AddFetchRoute( } // AddPingRoute adds a route to mux that will accept an empty GET request, -// fetch the current state of the routing table by calling GetTable using fetchURL, -// then returning the new contents of the routing table to the client +// fetch the current state of the routing table from the standard routing +// table ConfigMap (ConfigMapRoutingTableName), save it to local memory, and +// return the contents of the routing table to the client. func AddPingRoute( lggr logr.Logger, mux *http.ServeMux, - httpCl *http.Client, - fetchURL *url.URL, + getter k8s.ConfigMapGetter, table *Table, q queue.Counter, ) { @@ -43,9 +42,8 @@ func AddPingRoute( mux.HandleFunc(routingPingPath, func(w http.ResponseWriter, r *http.Request) { err := GetTable( r.Context(), - httpCl, lggr, - *fetchURL, + getter, table, q, ) @@ -82,63 +80,3 @@ func newTableHandler( } }) } - -// GetTable fetches a table via an RPC call to operatorAdminURL, replaces the -// current value of table with the newly fetched table using table.Replace(), and -// ensures that all hosts are present in q using q.Ensure() for each host in -// the newly fetched table -func GetTable( - ctx context.Context, - httpCl *http.Client, - lggr logr.Logger, - operatorAdminURL url.URL, - table *Table, - q queue.Counter, -) error { - lggr = lggr.WithName("pkg.routing.GetTable") - - operatorAdminURL.Path = routingFetchPath - - res, err := httpCl.Get(operatorAdminURL.String()) - if err != nil { - lggr.Error( - err, - "fetching the routing table URL", - "url", - operatorAdminURL.String(), - ) - return err - } - defer res.Body.Close() - newTable := NewTable() - if err := json.NewDecoder(res.Body).Decode(newTable); err != nil { - lggr.Error( - err, - "decoding routing table URL response", - ) - return err - } - table.Replace(newTable) - - // ensure that the queue has all hosts that exist in the table - for host := range newTable.m { - q.Ensure(host) - } - - // ensure that the queue doesn't have any extra hosts that don't exist in the table - qCur, err := q.Current() - if err != nil { - lggr.Error( - err, - "failed to get current queue counts (in order to prune it of missing routing table hosts)", - ) - return err - } - for host := range qCur.Counts { - if _, err := table.Lookup(host); err != nil { - q.Remove(host) - } - } - - return nil -} diff --git a/pkg/routing/updater.go b/pkg/routing/updater.go deleted file mode 100644 index 04c1ff8a..00000000 --- a/pkg/routing/updater.go +++ /dev/null @@ -1,51 +0,0 @@ -package routing - -import ( - "context" - "net/http" - "net/url" - "time" - - "github.com/go-logr/logr" - "github.com/kedacore/http-add-on/pkg/queue" - "github.com/pkg/errors" -) - -// StartUpdateLoop begins a loop that calls fn every updateInterval. -// if fn returns an error, returns that error immediately and stops -// calling it. similarly, if ctx.Done() receives, this function -// stops calling fn and returns an error that wraps ctx.Err() -// immediately -func StartUpdateLoop( - ctx context.Context, - lggr logr.Logger, - updateInterval time.Duration, - fn func(context.Context, time.Time) error, - // t *Table, - // q queue.Counter, -) error { - ticker := time.NewTicker(updateInterval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - if err := fn(ctx, time.Now()); err != nil { - return errors.Wrap(err, "trying to fetch routing table") - } - case <-ctx.Done(): - return errors.Wrap(ctx.Err(), "context timeout") - } - } -} - -func NewGetTableUpdateLoopFunc( - lggr logr.Logger, - httpCl *http.Client, - fetchURL url.URL, - table *Table, - q queue.Counter, -) func(context.Context, time.Time) error { - return func(ctx context.Context, now time.Time) error { - return GetTable(ctx, httpCl, lggr, fetchURL, table, q) - } -}