Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logic in interceptor to update routing table from ConfigMap #11

Merged
merged 11 commits into from
Aug 12, 2021
5 changes: 1 addition & 4 deletions e2e/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ func getClient() (
return cl, cfg, nil
}

func objKey(ns, name string) client.ObjectKey {
return client.ObjectKey{Namespace: ns, Name: name}
}
func deleteNS(ns string) error {
return sh.RunV("kubectl", "delete", "namespace", ns)
}
Expand Down Expand Up @@ -63,7 +60,7 @@ func getScaledObject(
if err != nil {
return nil, err
}
if err := cl.Get(ctx, objKey(ns, name), scaledObject); err != nil {
if err := cl.Get(ctx, k8s.ObjKey(ns, name), scaledObject); err != nil {
return nil, err
}
return scaledObject, nil
Expand Down
3 changes: 2 additions & 1 deletion e2e/wait_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/kedacore/http-add-on/pkg/k8s"
"golang.org/x/sync/errgroup"
appsv1 "k8s.io/api/apps/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -21,7 +22,7 @@ func waitUntilDeployment(
depl := &appsv1.Deployment{}
if err := cl.Get(
ctx,
objKey(ns, name),
k8s.ObjKey(ns, name),
depl,
); err != nil {
return err
Expand Down
41 changes: 0 additions & 41 deletions interceptor/config/operator.go

This file was deleted.

9 changes: 9 additions & 0 deletions interceptor/config/serving.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ type Serving struct {
// This is the server that the external scaler will issue metrics
// requests to
AdminPort int `envconfig:"KEDA_HTTP_ADMIN_PORT" required:"true"`
// RoutingTableUpdateDurationMS is the interval (in milliseconds) representing how
// often to do a complete update of the routing table ConfigMap.
//
// The interceptor will also open a watch stream to the routing table
// ConfigMap and attempt to update the routing table on every update.
//
// Since it does full updates alongside watch stream updates, it can
// only process one at a time. Therefore, this is a best effort timeout
RoutingTableUpdateDurationMS int `envconfig:"KEDA_HTTP_OPERATOR_ROUTING_TABLE_UPDATE_DURATION_MS" default:"500"`
}

// Parse parses standard configs using envconfig and returns a pointer to the
Expand Down
71 changes: 34 additions & 37 deletions interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"net/http"
nethttp "net/http"
"net/url"
"os"
"time"

Expand All @@ -33,7 +31,6 @@ func main() {
os.Exit(1)
}
timeoutCfg := config.MustParseTimeouts()
operatorCfg := config.MustParseOperator()
servingCfg := config.MustParseServing()
ctx := context.Background()

Expand Down Expand Up @@ -61,29 +58,23 @@ func main() {
lggr.Error(err, "creating new deployment cache")
os.Exit(1)
}
waitFunc := newDeployReplicasForwardWaitFunc(deployCache)

operatorRoutingFetchURL, err := operatorCfg.RoutingFetchURL()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: take old interceptor configurations out of helm chart

if err != nil {
lggr.Error(err, "getting the operator URL")
os.Exit(1)
}
configMapsInterface := cl.CoreV1().ConfigMaps(servingCfg.CurrentNamespace)

waitFunc := newDeployReplicasForwardWaitFunc(deployCache)

lggr.Info("Interceptor starting")
lggr.Info(
"Fetching initial routing table",
"operatorRoutingURL",
operatorRoutingFetchURL.String(),
)

q := queue.NewMemory()
routingTable := routing.NewTable()

lggr.Info(
"Fetching initial routing table",
)
if err := routing.GetTable(
ctx,
nethttp.DefaultClient,
lggr,
*operatorRoutingFetchURL,
configMapsInterface,
routingTable,
q,
); err != nil {
Expand All @@ -92,37 +83,44 @@ func main() {
}

errGrp, ctx := errgroup.WithContext(ctx)

// start the update loop that updates the routing table from
// the ConfigMap that the operator updates as HTTPScaledObjects
// enter and exit the system
errGrp.Go(func() error {
return runAdminServer(
return routing.StartConfigMapRoutingTableUpdater(
ctx,
lggr,
operatorRoutingFetchURL,
q,
time.Duration(servingCfg.RoutingTableUpdateDurationMS)*time.Millisecond,
configMapsInterface,
routingTable,
adminPort,
q,
)
})

// start the administrative server. this is the server
// that serves the queue size API
errGrp.Go(func() error {
lggr.Info(
"routing table update loop starting",
"operatorRoutingURL",
operatorRoutingFetchURL.String(),
"starting the admin server",
"port",
adminPort,
)

return routing.StartUpdateLoop(
ctx,
return runAdminServer(
lggr,
operatorCfg.RoutingTableUpdateDuration(),
routing.NewGetTableUpdateLoopFunc(
lggr,
http.DefaultClient,
*operatorRoutingFetchURL,
routingTable,
q,
),
configMapsInterface,
q,
routingTable,
adminPort,
)
})

errGrp.Go(func() error {
lggr.Info(
"starting the proxy server",
"port",
proxyPort,
)
return runProxyServer(
lggr,
q,
Expand All @@ -142,7 +140,7 @@ func main() {

func runAdminServer(
lggr logr.Logger,
routingFetchURL *url.URL,
cmGetter k8s.ConfigMapGetter,
q queue.Counter,
routingTable *routing.Table,
port int,
Expand All @@ -162,8 +160,7 @@ func runAdminServer(
routing.AddPingRoute(
lggr,
adminServer,
http.DefaultClient,
routingFetchURL,
cmGetter,
routingTable,
q,
)
Expand Down
67 changes: 45 additions & 22 deletions operator/controllers/routing_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/routing"
pkgerrs "github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -60,37 +62,58 @@ func updateRoutingMap(
namespace string,
table *routing.Table,
) error {
tableAsJSON, marshalErr := table.MarshalJSON()
if marshalErr != nil {
return marshalErr
}

routingConfigMap, err := k8s.GetConfigMap(ctx, cl, namespace, routingTableName)
if err != nil {
return err
lggr = lggr.WithName("updateRoutingMap")
routingConfigMap, err := k8s.GetConfigMap(ctx, cl, namespace, routing.ConfigMapRoutingTableName)
// if there is an error other than not found on the ConfigMap, we should
// fail
if err != nil && !errors.IsNotFound(err) {
lggr.Error(
err,
"other issue fetching the routing table ConfigMap",
"configMapName",
routing.ConfigMapRoutingTableName,
)
return pkgerrs.Wrap(err, "routing table ConfigMap fetch error")
}

// if the routing table doesn't exist, we need to create it with the latest data
if routingConfigMap == nil {
routingTableData := map[string]string{
"routing-table": string(tableAsJSON),
}

// if either the routing table ConfigMap doesn't exist or for some reason it's
// nil in memory, we need to create it
if errors.IsNotFound(err) || routingConfigMap == nil {
lggr.Info(
"routing table ConfigMap didn't exist, creating it",
"configMapName",
routing.ConfigMapRoutingTableName,
)
routingTableLabels := map[string]string{
"control-plane": "operator",
"keda.sh/addon": "http-add-on",
"app": "http-add-on",
"name": "http-add-on-routing-table",
"app": "http-add-on",
"name": "http-add-on-routing-table",
}

if err := k8s.CreateConfigMap(ctx, lggr, cl, k8s.NewConfigMap(namespace, routingTableName, routingTableLabels, routingTableData)); err != nil {
cm := k8s.NewConfigMap(
namespace,
routing.ConfigMapRoutingTableName,
routingTableLabels,
map[string]string{},
)
if err := routing.SaveTableToConfigMap(table, cm); err != nil {
return err
}
if err := k8s.CreateConfigMap(
ctx,
lggr,
cl,
cm,
); err != nil {
return err
}
} else {
newRoutingTable := routingConfigMap.DeepCopy()
newRoutingTable.Data["routing-table"] = string(tableAsJSON)
if _, patchErr := k8s.PatchConfigMap(ctx, lggr, cl, routingConfigMap, newRoutingTable); patchErr != nil {
return patchErr
newCM := routingConfigMap.DeepCopy()
if err := routing.SaveTableToConfigMap(table, newCM); err != nil {
return err
}
if _, patchErr := k8s.PatchConfigMap(ctx, lggr, cl, routingConfigMap, newCM); patchErr != nil {
return patchErr
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// NewClientset gets a new Kubernetes clientset, or calls log.Fatal
Expand All @@ -27,3 +28,9 @@ func NewClientset() (*kubernetes.Clientset, dynamic.Interface, error) {
}
return clientset, dynamic, nil
}

// ObjKey creates a new client.ObjectKey with the given
// name and namespace
func ObjKey(ns, name string) client.ObjectKey {
return client.ObjectKey{Namespace: ns, Name: name}
}
Loading