Skip to content

Commit

Permalink
Add logic in interceptor to update routing table from ConfigMap (#11)
Browse files Browse the repository at this point in the history
Co-authored-by: Lucas Santos <lhs.santoss@gmail.com>
Signed-off-by: Aaron Schlesinger <70865+arschles@users.noreply.github.com>
  • Loading branch information
arschles and khaosdoctor authored Aug 12, 2021
1 parent 3aa0207 commit f683e39
Show file tree
Hide file tree
Showing 16 changed files with 496 additions and 318 deletions.
5 changes: 1 addition & 4 deletions e2e/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ func getClient() (
return cl, cfg, nil
}

func objKey(ns, name string) client.ObjectKey {
return client.ObjectKey{Namespace: ns, Name: name}
}
func deleteNS(ns string) error {
return sh.RunV("kubectl", "delete", "namespace", ns)
}
Expand Down Expand Up @@ -63,7 +60,7 @@ func getScaledObject(
if err != nil {
return nil, err
}
if err := cl.Get(ctx, objKey(ns, name), scaledObject); err != nil {
if err := cl.Get(ctx, k8s.ObjKey(ns, name), scaledObject); err != nil {
return nil, err
}
return scaledObject, nil
Expand Down
3 changes: 2 additions & 1 deletion e2e/wait_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/kedacore/http-add-on/pkg/k8s"
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -21,7 +22,7 @@ func waitUntilDeployment(
depl := &appsv1.Deployment{}
if err := cl.Get(
ctx,
objKey(ns, name),
k8s.ObjKey(ns, name),
depl,
); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
go.uber.org/zap v1.17.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/grpc v1.33.2
google.golang.org/protobuf v1.27.1
google.golang.org/protobuf v1.26.0
k8s.io/api v0.21.3
k8s.io/apimachinery v0.21.3
k8s.io/client-go v0.21.3
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -673,9 +673,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
41 changes: 0 additions & 41 deletions interceptor/config/operator.go

This file was deleted.

9 changes: 9 additions & 0 deletions interceptor/config/serving.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ type Serving struct {
// This is the server that the external scaler will issue metrics
// requests to
AdminPort int `envconfig:"KEDA_HTTP_ADMIN_PORT" required:"true"`
// RoutingTableUpdateDurationMS is the interval (in milliseconds) representing how
// often to do a complete update of the routing table ConfigMap.
//
// The interceptor will also open a watch stream to the routing table
// ConfigMap and attempt to update the routing table on every update.
//
// Since it does full updates alongside watch stream updates, it can
// only process one at a time. Therefore, this is a best effort timeout
RoutingTableUpdateDurationMS int `envconfig:"KEDA_HTTP_OPERATOR_ROUTING_TABLE_UPDATE_DURATION_MS" default:"500"`
}

// Parse parses standard configs using envconfig and returns a pointer to the
Expand Down
71 changes: 34 additions & 37 deletions interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"net/http"
nethttp "net/http"
"net/url"
"os"
"time"

Expand All @@ -33,7 +31,6 @@ func main() {
os.Exit(1)
}
timeoutCfg := config.MustParseTimeouts()
operatorCfg := config.MustParseOperator()
servingCfg := config.MustParseServing()
ctx := context.Background()

Expand Down Expand Up @@ -61,29 +58,23 @@ func main() {
lggr.Error(err, "creating new deployment cache")
os.Exit(1)
}
waitFunc := newDeployReplicasForwardWaitFunc(deployCache)

operatorRoutingFetchURL, err := operatorCfg.RoutingFetchURL()
if err != nil {
lggr.Error(err, "getting the operator URL")
os.Exit(1)
}
configMapsInterface := cl.CoreV1().ConfigMaps(servingCfg.CurrentNamespace)

waitFunc := newDeployReplicasForwardWaitFunc(deployCache)

lggr.Info("Interceptor starting")
lggr.Info(
"Fetching initial routing table",
"operatorRoutingURL",
operatorRoutingFetchURL.String(),
)

q := queue.NewMemory()
routingTable := routing.NewTable()

lggr.Info(
"Fetching initial routing table",
)
if err := routing.GetTable(
ctx,
nethttp.DefaultClient,
lggr,
*operatorRoutingFetchURL,
configMapsInterface,
routingTable,
q,
); err != nil {
Expand All @@ -92,37 +83,44 @@ func main() {
}

errGrp, ctx := errgroup.WithContext(ctx)

// start the update loop that updates the routing table from
// the ConfigMap that the operator updates as HTTPScaledObjects
// enter and exit the system
errGrp.Go(func() error {
return runAdminServer(
return routing.StartConfigMapRoutingTableUpdater(
ctx,
lggr,
operatorRoutingFetchURL,
q,
time.Duration(servingCfg.RoutingTableUpdateDurationMS)*time.Millisecond,
configMapsInterface,
routingTable,
adminPort,
q,
)
})

// start the administrative server. this is the server
// that serves the queue size API
errGrp.Go(func() error {
lggr.Info(
"routing table update loop starting",
"operatorRoutingURL",
operatorRoutingFetchURL.String(),
"starting the admin server",
"port",
adminPort,
)

return routing.StartUpdateLoop(
ctx,
return runAdminServer(
lggr,
operatorCfg.RoutingTableUpdateDuration(),
routing.NewGetTableUpdateLoopFunc(
lggr,
http.DefaultClient,
*operatorRoutingFetchURL,
routingTable,
q,
),
configMapsInterface,
q,
routingTable,
adminPort,
)
})

errGrp.Go(func() error {
lggr.Info(
"starting the proxy server",
"port",
proxyPort,
)
return runProxyServer(
lggr,
q,
Expand All @@ -142,7 +140,7 @@ func main() {

func runAdminServer(
lggr logr.Logger,
routingFetchURL *url.URL,
cmGetter k8s.ConfigMapGetter,
q queue.Counter,
routingTable *routing.Table,
port int,
Expand All @@ -162,8 +160,7 @@ func runAdminServer(
routing.AddPingRoute(
lggr,
adminServer,
http.DefaultClient,
routingFetchURL,
cmGetter,
routingTable,
q,
)
Expand Down
67 changes: 45 additions & 22 deletions operator/controllers/routing_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/routing"
pkgerrs "github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -60,37 +62,58 @@ func updateRoutingMap(
namespace string,
table *routing.Table,
) error {
tableAsJSON, marshalErr := table.MarshalJSON()
if marshalErr != nil {
return marshalErr
}

routingConfigMap, err := k8s.GetConfigMap(ctx, cl, namespace, routingTableName)
if err != nil {
return err
lggr = lggr.WithName("updateRoutingMap")
routingConfigMap, err := k8s.GetConfigMap(ctx, cl, namespace, routing.ConfigMapRoutingTableName)
// if there is an error other than not found on the ConfigMap, we should
// fail
if err != nil && !errors.IsNotFound(err) {
lggr.Error(
err,
"other issue fetching the routing table ConfigMap",
"configMapName",
routing.ConfigMapRoutingTableName,
)
return pkgerrs.Wrap(err, "routing table ConfigMap fetch error")
}

// if the routing table doesn't exist, we need to create it with the latest data
if routingConfigMap == nil {
routingTableData := map[string]string{
"routing-table": string(tableAsJSON),
}

// if either the routing table ConfigMap doesn't exist or for some reason it's
// nil in memory, we need to create it
if errors.IsNotFound(err) || routingConfigMap == nil {
lggr.Info(
"routing table ConfigMap didn't exist, creating it",
"configMapName",
routing.ConfigMapRoutingTableName,
)
routingTableLabels := map[string]string{
"control-plane": "operator",
"keda.sh/addon": "http-add-on",
"app": "http-add-on",
"name": "http-add-on-routing-table",
"app": "http-add-on",
"name": "http-add-on-routing-table",
}

if err := k8s.CreateConfigMap(ctx, lggr, cl, k8s.NewConfigMap(namespace, routingTableName, routingTableLabels, routingTableData)); err != nil {
cm := k8s.NewConfigMap(
namespace,
routing.ConfigMapRoutingTableName,
routingTableLabels,
map[string]string{},
)
if err := routing.SaveTableToConfigMap(table, cm); err != nil {
return err
}
if err := k8s.CreateConfigMap(
ctx,
lggr,
cl,
cm,
); err != nil {
return err
}
} else {
newRoutingTable := routingConfigMap.DeepCopy()
newRoutingTable.Data["routing-table"] = string(tableAsJSON)
if _, patchErr := k8s.PatchConfigMap(ctx, lggr, cl, routingConfigMap, newRoutingTable); patchErr != nil {
return patchErr
newCM := routingConfigMap.DeepCopy()
if err := routing.SaveTableToConfigMap(table, newCM); err != nil {
return err
}
if _, patchErr := k8s.PatchConfigMap(ctx, lggr, cl, routingConfigMap, newCM); patchErr != nil {
return patchErr
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// NewClientset gets a new Kubernetes clientset, or calls log.Fatal
Expand All @@ -27,3 +28,9 @@ func NewClientset() (*kubernetes.Clientset, dynamic.Interface, error) {
}
return clientset, dynamic, nil
}

// ObjKey creates a new client.ObjectKey with the given
// name and namespace
func ObjKey(ns, name string) client.ObjectKey {
return client.ObjectKey{Namespace: ns, Name: name}
}
Loading

0 comments on commit f683e39

Please sign in to comment.