Skip to content

Commit

Permalink
Merge pull request #63 from kubescape/add-http-adapter
Browse files Browse the repository at this point in the history
Add http endpoint adapter
  • Loading branch information
Bezbran committed Mar 4, 2024
2 parents 4a38bcf + f2ae3e3 commit f5c2421
Show file tree
Hide file tree
Showing 19 changed files with 670 additions and 80 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/pr-created.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ jobs:
"TestSynchronizer_TC09",
"TestSynchronizer_TC10",
"TestSynchronizer_TC11",
"TestSynchronizer_TC12"
"TestSynchronizer_TC12",
"TestSynchronizer_TC13_HTTPEndpoint",
]
steps:
- name: Checkout code
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@
# Dependency directories (remove the comment below to include it)
# vendor/

# Go workspace file
go.work
# debug files
*/__debug_*
10 changes: 5 additions & 5 deletions adapters/backend/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (c *Client) sendDeleteObjectMessage(ctx context.Context, id domain.KindName
return fmt.Errorf("marshal delete object message: %w", err)
}

return c.messageProducer.ProduceMessage(ctx, cId, messaging.MsgPropEventValueDeleteObjectMessage, data)
return c.messageProducer.ProduceMessage(ctx, cId, messaging.MsgPropEventValueDeleteObjectMessage, data, id.ToCustomProperties())
}

func (c *Client) sendGetObjectMessage(ctx context.Context, id domain.KindName, baseObject []byte) error {
Expand Down Expand Up @@ -223,7 +223,7 @@ func (c *Client) sendGetObjectMessage(ctx context.Context, id domain.KindName, b
return fmt.Errorf("marshal get object message: %w", err)
}

return c.messageProducer.ProduceMessage(ctx, cId, messaging.MsgPropEventValueGetObjectMessage, data)
return c.messageProducer.ProduceMessage(ctx, cId, messaging.MsgPropEventValueGetObjectMessage, data, id.ToCustomProperties())
}

func (c *Client) sendPatchObjectMessage(ctx context.Context, id domain.KindName, checksum string, patch []byte) error {
Expand Down Expand Up @@ -255,7 +255,7 @@ func (c *Client) sendPatchObjectMessage(ctx context.Context, id domain.KindName,
return fmt.Errorf("marshal patch object message: %w", err)
}

return c.messageProducer.ProduceMessage(ctx, cId, messaging.MsgPropEventValuePatchObjectMessage, data)
return c.messageProducer.ProduceMessage(ctx, cId, messaging.MsgPropEventValuePatchObjectMessage, data, id.ToCustomProperties())
}

func (c *Client) sendPutObjectMessage(ctx context.Context, id domain.KindName, object []byte) error {
Expand Down Expand Up @@ -285,7 +285,7 @@ func (c *Client) sendPutObjectMessage(ctx context.Context, id domain.KindName, o
return fmt.Errorf("marshal put object message: %w", err)
}

return c.messageProducer.ProduceMessage(ctx, cId, messaging.MsgPropEventValuePutObjectMessage, data)
return c.messageProducer.ProduceMessage(ctx, cId, messaging.MsgPropEventValuePutObjectMessage, data, id.ToCustomProperties())
}

func (c *Client) sendVerifyObjectMessage(ctx context.Context, id domain.KindName, checksum string) error {
Expand Down Expand Up @@ -315,7 +315,7 @@ func (c *Client) sendVerifyObjectMessage(ctx context.Context, id domain.KindName
return fmt.Errorf("marshal verify object message: %w", err)
}

return c.messageProducer.ProduceMessage(ctx, cId, messaging.MsgPropEventValueVerifyObjectMessage, data)
return c.messageProducer.ProduceMessage(ctx, cId, messaging.MsgPropEventValueVerifyObjectMessage, data, id.ToCustomProperties())
}

func (c *Client) SendReconciliationRequestMessage(ctx context.Context) error {
Expand Down
16 changes: 13 additions & 3 deletions adapters/backend/v1/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,8 @@ func NewPulsarMessageProducer(cfg config.Config, pulsarClient pulsarconnector.Cl
return &PulsarMessageProducer{producer: producer}, nil
}

func (p *PulsarMessageProducer) ProduceMessage(ctx context.Context, id domain.ClientIdentifier, eventType string, payload []byte) error {
producerMessage := NewProducerMessage(SynchronizerServerProducerKey, id.Account, id.Cluster, eventType, payload)
func (p *PulsarMessageProducer) ProduceMessage(ctx context.Context, id domain.ClientIdentifier, eventType string, payload []byte, optionalProperties ...map[string]string) error {
producerMessage := NewProducerMessage(SynchronizerServerProducerKey, id.Account, id.Cluster, eventType, payload, optionalProperties...)
p.producer.SendAsync(ctx, producerMessage, logPulsarSyncAsyncErrors)
return nil
}
Expand All @@ -352,6 +352,11 @@ func (p *PulsarMessageProducer) ProduceMessageWithoutIdentifier(ctx context.Cont

func logPulsarSyncAsyncErrors(msgID pulsar.MessageID, message *pulsar.ProducerMessage, err error) {
var metricLabels prometheus.Labels
if msgID == nil {
metricLabels = prometheus.Labels{prometheusStatusLabel: prometheusStatusLabelValueError}
logger.L().Error("got empty messageID from pulsar", helpers.Error(err))
return
}
if err != nil {
metricLabels = prometheus.Labels{prometheusStatusLabel: prometheusStatusLabelValueError}
logger.L().Error("failed to send message to pulsar",
Expand All @@ -370,11 +375,16 @@ func logPulsarSyncAsyncErrors(msgID pulsar.MessageID, message *pulsar.ProducerMe
}
}

func NewProducerMessage(producerMessageKey, account, cluster, eventType string, payload []byte) *pulsar.ProducerMessage {
func NewProducerMessage(producerMessageKey, account, cluster, eventType string, payload []byte, optionalProperties ...map[string]string) *pulsar.ProducerMessage {
producerMessageProperties := map[string]string{
messaging.MsgPropTimestamp: time.Now().Format(time.RFC3339Nano),
messaging.MsgPropEvent: eventType,
}
for _, optionalProperty := range optionalProperties {
for k, v := range optionalProperty {
producerMessageProperties[k] = v
}
}

if account != "" {
producerMessageProperties[messaging.MsgPropAccount] = account
Expand Down
246 changes: 246 additions & 0 deletions adapters/httpendpoint/v1/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package httpendpoint

import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"
"time"

"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/synchronizer/adapters"
"github.com/kubescape/synchronizer/config"
"github.com/kubescape/synchronizer/domain"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

type Adapter struct {
callbacks domain.Callbacks
cfg config.HTTPEndpoint
clients map[string]adapters.Client
httpMux *http.ServeMux
httpServer *http.Server
supportedPaths map[domain.Strategy]map[string]map[string]map[string]bool
isStarted bool
}

func NewHTTPEndpointAdapter(cfg config.HTTPEndpoint) *Adapter {
httpMux := http.NewServeMux()
httpMux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

server := &http.Server{
Addr: fmt.Sprintf(":%s", cfg.ServerPort),
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 120 * time.Second,
Handler: httpMux,
}
a := &Adapter{
cfg: cfg,
clients: map[string]adapters.Client{},
httpMux: httpMux,
httpServer: server,
}
httpMux.Handle("/", a)
return a
}

// ensure that the Adapter struct satisfies the adapters.Adapter interface at compile-time
var _ adapters.Adapter = (*Adapter)(nil)

func (a *Adapter) GetConfig() config.HTTPEndpoint {
return a.cfg
}

// No-OP functions for functions needed only for backend re-sync
func (a *Adapter) DeleteObject(ctx context.Context, id domain.KindName) error {
return nil
}

func (a *Adapter) GetObject(ctx context.Context, id domain.KindName, baseObject []byte) error {
return nil
}

func (a *Adapter) PatchObject(ctx context.Context, id domain.KindName, checksum string, patch []byte) error {
return nil
}

func (a *Adapter) PutObject(ctx context.Context, id domain.KindName, object []byte) error {
return nil
}

func (a *Adapter) VerifyObject(ctx context.Context, id domain.KindName, checksum string) error {
return nil
}

func (a *Adapter) Batch(ctx context.Context, kind domain.Kind, batchType domain.BatchType, items domain.BatchItems) error {
return nil
}

func (a *Adapter) RegisterCallbacks(mainCtx context.Context, callbacks domain.Callbacks) {
a.httpServer.BaseContext = func(_ net.Listener) context.Context {
return mainCtx
}
a.callbacks = callbacks
}

func (a *Adapter) Callbacks(_ context.Context) (domain.Callbacks, error) {
return a.callbacks, nil
}

func (a *Adapter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/healthz" {
w.WriteHeader(http.StatusOK)
return
}
if r.URL.Path == "/readyz" {
if a.isStarted {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
return
}
logger.L().Ctx(r.Context()).Info("httpendpoint request", helpers.String("path", r.URL.Path), helpers.String("method", r.Method))
// TODO: add tracing span
// validate the request verb + path
if a.supportedPaths == nil {
w.WriteHeader(http.StatusInternalServerError)
logger.L().Ctx(r.Context()).Warning("httpendpoint supportedPaths is nil")
return
}
// validate the request verb + path
// URL path should be in the format of /apis/v1/<group>/<version>/<resource-kind>
// validate the request path
lowerCasePath := strings.ToLower(r.URL.Path)
pathSlices := strings.Split(lowerCasePath, "/")
if len(pathSlices) != 6 {
w.WriteHeader(http.StatusBadRequest)
logger.L().Ctx(r.Context()).Warning("httpendpoint request path is invalid", helpers.String("path", r.URL.Path))
return
}
if pathSlices[0] != "" || pathSlices[1] != "apis" || pathSlices[2] != "v1" || pathSlices[3] == "" || pathSlices[4] == "" || pathSlices[5] == "" {
w.WriteHeader(http.StatusBadRequest)
logger.L().Ctx(r.Context()).Warning("httpendpoint error #2. Request path is invalid", helpers.String("path", r.URL.Path))
return
}
pathSlices = pathSlices[3:]
// validate the request path against the supported paths
strategy := domain.Strategy(strings.ToLower(r.Method))
switch r.Method {
case http.MethodPut:
strategy = domain.PatchStrategy
case http.MethodPost:
strategy = domain.CopyStrategy
}
if _, ok := a.supportedPaths[strategy]; !ok {
w.WriteHeader(http.StatusMethodNotAllowed)
logger.L().Ctx(r.Context()).Warning("httpendpoint request method is not supported", helpers.String("method", r.Method))
return
}
if _, ok := a.supportedPaths[strategy][pathSlices[0]]; !ok {
w.WriteHeader(http.StatusNotFound)
logger.L().Ctx(r.Context()).Warning("httpendpoint request group path is not supported", helpers.String("path", r.URL.Path))
return
}
if _, ok := a.supportedPaths[strategy][pathSlices[0]][pathSlices[1]]; !ok {
w.WriteHeader(http.StatusNotFound)
logger.L().Ctx(r.Context()).Warning("httpendpoint request version path is not supported", helpers.String("path", r.URL.Path))
return
}
if _, ok := a.supportedPaths[strategy][pathSlices[0]][pathSlices[1]][pathSlices[2]]; !ok {
w.WriteHeader(http.StatusNotFound)
logger.L().Ctx(r.Context()).Warning("httpendpoint request resource path is not supported", helpers.String("path", r.URL.Path))
return
}

// read the request body
if r.Body == nil {
w.WriteHeader(http.StatusBadRequest)
logger.L().Ctx(r.Context()).Warning("httpendpoint request body is empty")
return
}
defer r.Body.Close()
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
logger.L().Ctx(r.Context()).Warning("httpendpoint request body read error", helpers.Error(err))
return
}
obj := &unstructured.Unstructured{}
if err := json.Unmarshal(bodyBytes, obj); err != nil {
w.WriteHeader(http.StatusInternalServerError)
logger.L().Ctx(r.Context()).Warning("httpendpoint request body read error", helpers.Error(err))
return
}
kindName := domain.FromUnstructured(obj)
// validate the request body against the URL path
// URL path should be in the format of /apis/v1/<group>/<version>/<resource-kind>
if kindName.Kind.Group != pathSlices[0] || kindName.Kind.Version != pathSlices[1] || kindName.Kind.Resource != pathSlices[2] {
w.WriteHeader(http.StatusBadRequest)
logger.L().Ctx(r.Context()).Warning("httpendpoint request body does not match the URL path", helpers.String("path", r.URL.Path), helpers.Interface("kindName", kindName))
return
}

// call the PutObject callback
switch strategy {
case domain.PatchStrategy:
if err := a.callbacks.PatchObject(r.Context(), kindName, "", bodyBytes); err != nil {
w.WriteHeader(http.StatusInternalServerError)
logger.L().Ctx(r.Context()).Warning("httpendpoint PatchObject callback error", helpers.Error(err))
return
}
case domain.CopyStrategy:
if err := a.callbacks.PutObject(r.Context(), kindName, bodyBytes); err != nil {
w.WriteHeader(http.StatusInternalServerError)
logger.L().Ctx(r.Context()).Warning("httpendpoint PutObject callback error", helpers.Error(err))
return
}
}
w.WriteHeader(http.StatusAccepted)
}

func (a *Adapter) Start(ctx context.Context) error {
// In order to validate the kind is supported by resources list in the config we will build a map of supported verbs, group, version and resource
// build the map:
a.supportedPaths = map[domain.Strategy]map[string]map[string]map[string]bool{}
for _, resource := range a.cfg.Resources {
lowerCaseStrategy := domain.Strategy(strings.ToLower(string(resource.Strategy)))
if _, ok := a.supportedPaths[lowerCaseStrategy]; !ok {
a.supportedPaths[lowerCaseStrategy] = map[string]map[string]map[string]bool{}
}
if _, ok := a.supportedPaths[lowerCaseStrategy][resource.Group]; !ok {
a.supportedPaths[lowerCaseStrategy][resource.Group] = map[string]map[string]bool{}
}
if _, ok := a.supportedPaths[lowerCaseStrategy][resource.Group][resource.Version]; !ok {
a.supportedPaths[lowerCaseStrategy][resource.Group][resource.Version] = map[string]bool{}
}
a.supportedPaths[lowerCaseStrategy][resource.Group][resource.Version][resource.Resource] = true
}
go func() {
if err := a.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.L().Ctx(ctx).Fatal("httpendpoint server error", helpers.Error(err))
}
logger.L().Ctx(ctx).Info("httpendpoint server stopped")
}()
a.isStarted = true
logger.L().Ctx(ctx).Info("httpendpoint server started", helpers.String("port", a.cfg.ServerPort))
return nil
}

func (a *Adapter) Stop(ctx context.Context) error {
if err := a.httpServer.Shutdown(ctx); err != nil {
return err
}
return nil
}

func (a *Adapter) IsRelated(ctx context.Context, id domain.ClientIdentifier) bool {
return a.cfg.Account == id.Account && a.cfg.ClusterName == id.Cluster
}
4 changes: 3 additions & 1 deletion adapters/incluster/v1/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func (a *Adapter) GetClient(id domain.KindName) (adapters.Client, error) {
func (a *Adapter) GetClientByKind(kind domain.Kind) adapters.Client {
client, ok := a.clients[kind.String()]
if !ok {
client = NewClient(a.k8sclient, a.cfg.Account, a.cfg.ClusterName, config.Resource{
logger.L().Error("client not found", helpers.String("kind", kind.String()))
// if client is not found, create an empty one to discard the messages from the server in callbacks if the kind is not in the list
client = NewClient(&NoOpDynamicClient{}, a.cfg.Account, a.cfg.ClusterName, config.Resource{
Group: kind.Group,
Version: kind.Version,
Resource: kind.Resource,
Expand Down
Loading

0 comments on commit f5c2421

Please sign in to comment.