Skip to content

Commit

Permalink
113 create robust scoring system for gateway tunnels (#114)
Browse files Browse the repository at this point in the history
- Structure to build and assess substrate node metrics, for gateway load balancing
- HTTP Serviceable scoring Based on:
  - Cpu usage
  - Serviceable assets cached in DAG
  - Serviceable runtime shadows created 
  - Serviceable memory allocation required to call vs available memory
  - Serviceable call history
 - Optimizations with http serviceable creation 
  -  Separate serviceable creation and provisioning serviceable execution requirements to minimize unnecessary operations
  • Loading branch information
tafseer-khan authored Sep 28, 2023
1 parent 1170440 commit f254414
Show file tree
Hide file tree
Showing 38 changed files with 1,004 additions and 431 deletions.
4 changes: 1 addition & 3 deletions clients/p2p/substrate/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "time"

var (
DefaultTimeOut = 10 * time.Millisecond
DefaultThreshold = 5
DefaultThreshold = 3
)

const (
Expand All @@ -13,6 +13,4 @@ const (
BodyHost = "host"
BodyPath = "path"
BodyMethod = "method"

ResponseCached = "cached"
)
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ require (
github.com/otiai10/copy v1.12.0
github.com/pkg/errors v0.9.1
github.com/pterm/pterm v0.12.65
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/spf13/afero v1.9.5
github.com/taubyte/builder v0.2.1
github.com/taubyte/cli-common v0.1.1
github.com/taubyte/config-compiler v0.4.6
github.com/taubyte/domain-validation v1.0.1
github.com/taubyte/go-interfaces v0.2.14-0.20230921175616-0dd966927da2
github.com/taubyte/go-interfaces v0.2.14-0.20230928164739-cb43412ebf90
github.com/taubyte/go-project-schema v0.9.3
github.com/taubyte/go-sdk v0.3.9
github.com/taubyte/go-sdk-smartops v0.1.3
Expand Down Expand Up @@ -226,7 +227,6 @@ require (
github.com/rs/cors v1.8.3 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/taubyte/go-sdk-symbols v0.2.7 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -885,8 +885,8 @@ github.com/taubyte/config-compiler v0.4.6 h1:9s3xn955imE7gAyVFpcsBEkowhCir78b1HP
github.com/taubyte/config-compiler v0.4.6/go.mod h1:gaohk1BSknIudo16QSgeQqVWVGLyusPPdLn+wQ+xAmY=
github.com/taubyte/domain-validation v1.0.1 h1:T1iRls4p5+uJLR8R/wf+dBt9Rlahg9BcCtBVbkoD0Ik=
github.com/taubyte/domain-validation v1.0.1/go.mod h1:/X3yd7sBjnE323rA8I9PiUt5+NlKU4I02nQik25Vqe8=
github.com/taubyte/go-interfaces v0.2.14-0.20230921175616-0dd966927da2 h1:hy6J8zeFXlJGMiEX9wYuGg2dU8E5zb7ujV+0F8FFVaE=
github.com/taubyte/go-interfaces v0.2.14-0.20230921175616-0dd966927da2/go.mod h1:OWQlR4DriWCjkgmfFFrlynXd4yjigUxKGIeUBWv1Mmw=
github.com/taubyte/go-interfaces v0.2.14-0.20230928164739-cb43412ebf90 h1:vs5vfSUD6enNlvMB0XjRtTm0b9ze+PYaQ7MJEA6k6W4=
github.com/taubyte/go-interfaces v0.2.14-0.20230928164739-cb43412ebf90/go.mod h1:OWQlR4DriWCjkgmfFFrlynXd4yjigUxKGIeUBWv1Mmw=
github.com/taubyte/go-project-schema v0.9.3 h1:2H0ClUZq7f97OgtL0FUe9tv2v12wOmnTAIiJGLei/gU=
github.com/taubyte/go-project-schema v0.9.3/go.mod h1:8Rt5zsVfj8qbYCT+7++oax/nFVKvVfAepzVkqXrNTs8=
github.com/taubyte/go-sdk v0.3.9 h1:mwwjiub/Jc987kfWvVfAcx63fMRAYB9cj4yhJq+CSyo=
Expand Down
71 changes: 57 additions & 14 deletions protocols/gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package gateway
import (
"errors"
"fmt"
"sort"

goHttp "net/http"

functionSpec "github.com/taubyte/go-specs/function"
websiteSpec "github.com/taubyte/go-specs/website"
http "github.com/taubyte/http"
"github.com/taubyte/p2p/streams/client"
tunnel "github.com/taubyte/p2p/streams/tunnels/http"
"github.com/taubyte/tau/protocols/substrate/components/metrics"
)

func (g *Gateway) attach() {
Expand All @@ -23,36 +27,75 @@ func (g *Gateway) attach() {
})
}

func (wr wrappedResponse) Decode(data interface{}) (err error) {
switch metricsData := data.(type) {
case []byte:
return wr.metrics.Decode(metricsData)
default:
return errors.New("metrics data not []byte")
}
}

func (g *Gateway) handleHttp(w goHttp.ResponseWriter, r *goHttp.Request) error {
resCh, err := g.substrateClient.ProxyHTTP(r.Host, r.URL.Path, r.Method)
if err != nil {
return fmt.Errorf("substrate client proxyHttp failed with: %w", err)
}

matches := make([]*client.Response, 0)
websiteMatches := make([]wrappedResponse, 0)
funcMatches := make([]wrappedResponse, 0)
discard := make([]*client.Response, 0)
for response := range resCh {
err := response.Error()
if err != nil {
if err := response.Error(); err != nil {
logger.Debugf("response from node `%s` failed with: %s", response.PID().Pretty(), err.Error())
}
if err == nil && g.Get(response).Cached() {
matches = append([]*client.Response{response}, matches...)
} else {
matches = append(matches, response)

if _metrics, err := response.Get(websiteSpec.PathVariable.String()); err == nil {
wres := wrappedResponse{Response: response, metrics: new(metrics.Website)}
if err = wres.Decode(_metrics); err == nil {
websiteMatches = append(websiteMatches, wres)
continue
}
}
}
if len(matches) < 1 {
return errors.New("no substrate match found")

if _metrics, err := response.Get(functionSpec.PathVariable.String()); err == nil {
wres := wrappedResponse{Response: response, metrics: new(metrics.Function)}
if err = wres.Decode(_metrics); err == nil {
funcMatches = append(funcMatches, wres)
continue
}
}

// all else
discard = append(discard, response)
}
defer func() {
for _, match := range matches {
match.Close()
for _, res := range discard {
res.Close()
}
for _, res := range websiteMatches {
res.Close()
}
for _, res := range funcMatches {
res.Close()
}
}()
if len(websiteMatches)+len(funcMatches) < 1 {
return errors.New("no substrate match found")
}

var pick *client.Response
if len(websiteMatches) > len(funcMatches) {
sort.Slice(websiteMatches, func(i, j int) bool { return websiteMatches[j].metrics.Less(websiteMatches[i].metrics) })
pick = websiteMatches[0].Response
} else {
sort.Slice(funcMatches, func(i, j int) bool { return funcMatches[j].metrics.Less(funcMatches[i].metrics) })
pick = funcMatches[0].Response
}

w.Header().Add(ProxyHeader, matches[0].PID().Pretty())
w.Header().Add(ProxyHeader, pick.PID().Pretty())

if err := tunnel.Frontend(w, r, matches[0]); err != nil {
if err := tunnel.Frontend(w, r, pick); err != nil {
return fmt.Errorf("tunneling Frontend failed with: %w", err)
}

Expand Down
18 changes: 18 additions & 0 deletions protocols/gateway/methods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package gateway

import (
http "github.com/taubyte/http"
"github.com/taubyte/p2p/peer"
)

func (g *Gateway) Node() peer.Node {
return g.node
}

func (g *Gateway) Http() http.Service {
return g.http
}

func (g *Gateway) Close() error {
return g.substrateClient.Close()
}
15 changes: 5 additions & 10 deletions protocols/gateway/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/taubyte/go-interfaces/services/substrate"
http "github.com/taubyte/http"
"github.com/taubyte/p2p/peer"
"github.com/taubyte/p2p/streams/client"
"github.com/taubyte/tau/protocols/substrate/components/metrics"
)

type Gateway struct {
Expand All @@ -19,14 +21,7 @@ type Gateway struct {
verbose bool
}

func (g *Gateway) Node() peer.Node {
return g.node
}

func (g *Gateway) Http() http.Service {
return g.http
}

func (g *Gateway) Close() error {
return nil
type wrappedResponse struct {
metrics metrics.Iface
*client.Response
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,38 @@ import (
"github.com/taubyte/tau/vm/lookup"
)

func (s *Service) handle(w goHttp.ResponseWriter, r *goHttp.Request) error {
startTime := time.Now()
matcher := common.New(helpers.ExtractHost(r.Host), r.URL.Path, r.Method)

func (s *Service) Lookup(matcher *common.MatchDefinition) (iface.Serviceable, error) {
servs, err := lookup.Lookup(s, matcher)
if err != nil {
return fmt.Errorf("http serviceable lookup failed with: %s", err)
return nil, fmt.Errorf("http serviceable lookup failed with: %w", err)
}

if len(servs) != 1 {
return fmt.Errorf("lookup returned %d serviceables, expected 1", len(servs))
return nil, fmt.Errorf("lookup returned %d serviceables, expected 1", len(servs))
}

pick, ok := servs[0].(iface.Serviceable)
if !ok {
return fmt.Errorf("matched serviceable is not a http serviceable")
return nil, fmt.Errorf("matched serviceable is not a http serviceable")
}

return pick, nil
}

func (s *Service) handle(w goHttp.ResponseWriter, r *goHttp.Request) error {
startTime := time.Now()
matcher := common.New(helpers.ExtractHost(r.Host), r.URL.Path, r.Method)

pick, err := s.Lookup(matcher)
if err != nil {
return fmt.Errorf("looking up serviceable failed with: %w", err)
}

if !pick.IsProvisioned() {
pick, err = pick.Provision()
if err != nil {
return fmt.Errorf("provisioning serviceable failed with: %w", err)
}
}

if err := pick.Ready(); err != nil {
Expand Down
16 changes: 11 additions & 5 deletions protocols/substrate/components/http/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,24 @@ var _ commonIface.MatchDefinition = &MatchDefinition{}

func New(host, path, method string) *MatchDefinition {
return &MatchDefinition{
Host: host,
Path: path,
Method: method,
Request: &Request{
Host: host,
Path: path,
Method: method,
},
params: make(map[string]string, 0),
}
}

// TODO: Maybe move this to interfaces?
type MatchDefinition struct {
type Request struct {
Host string
Path string
Method string
}

// TODO: Maybe move this to interfaces?
type MatchDefinition struct {
*Request
params map[string]string
}

Expand Down
Loading

0 comments on commit f254414

Please sign in to comment.