Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API: Sandbox routing: Use Redis for client-proxy DNS instead of local map. #245

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions .terraform.lock.hcl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

88 changes: 65 additions & 23 deletions packages/api/internal/dns/server.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,84 @@
package dns

import (
"context"
"fmt"
"log"
"net"
"strings"
"sync"
"time"

redis "github.com/go-redis/redis/v8"
resolver "github.com/miekg/dns"

"github.com/e2b-dev/infra/packages/shared/pkg/smap"
"go.uber.org/zap"
)

const redisExpirationTime = time.Hour * 24

const ttl = 0

const defaultRoutingIP = "127.0.0.1"

type FallbackResolverFn = func(sandboxID string) (string, bool)

type DNS struct {
mu sync.Mutex
records *smap.Map[string]
ctx context.Context
rdb *redis.Client
fallbackResolverFn FallbackResolverFn
logger *zap.SugaredLogger
}

func New() *DNS {
func New(ctx context.Context, rdbOpts *redis.Options, fallbackResolverFn FallbackResolverFn, logger *zap.SugaredLogger) *DNS {
return &DNS{
records: smap.New[string](),
ctx: ctx,
rdb: redis.NewClient(rdbOpts),
fallbackResolverFn: fallbackResolverFn,
logger: logger,
}
}

func (d *DNS) Add(sandboxID, ip string) {
d.records.Insert(d.hostname(sandboxID), ip)
func (d *DNS) Add(sandboxID, ip string) error {
d.logger.Infof("DNS: Adding entry, sandboxID=%s -> %s", sandboxID, ip)
if err := d.rdb.Set(d.ctx, d.dnsKeyFor(sandboxID), ip, redisExpirationTime).Err(); err != nil {
return err
}
return nil
}
jakubno marked this conversation as resolved.
Show resolved Hide resolved

jakubno marked this conversation as resolved.
Show resolved Hide resolved
func (d *DNS) Remove(sandboxID, ip string) {
d.records.RemoveCb(d.hostname(sandboxID), func(key string, v string, exists bool) bool {
return v == ip
})
func (d *DNS) Remove(sandboxID string) error {
d.logger.Infof("DNS: Removing entry, sandboxID=%s", sandboxID)
if err := d.rdb.Del(d.ctx, d.dnsKeyFor(sandboxID)).Err(); err != nil {
jakubno marked this conversation as resolved.
Show resolved Hide resolved
return err
}
return nil
}
jakubno marked this conversation as resolved.
Show resolved Hide resolved
jakubno marked this conversation as resolved.
Show resolved Hide resolved

func (d *DNS) get(hostname string) (string, bool) {
return d.records.Get(hostname)
func (d *DNS) get(sandboxID string) (string, bool) {
res, err := d.rdb.Get(d.ctx, d.dnsKeyFor(sandboxID)).Result()
if err == nil {
return res, true
}
if err != redis.Nil {
d.logger.Warnf("DNS: Redis error getting key for sandbox '%s' (will try fallback resolver..): %s", sandboxID, err)
}

if d.fallbackResolverFn != nil {
if rec, ok := d.fallbackResolverFn(sandboxID); ok {
d.logger.Infof("DNS: Not found in redis, using fallback lookup for sandbox '%s' succeeded: record=%q", sandboxID, rec)
go func() {
if err := d.Add(sandboxID, rec); err != nil {
d.logger.Errorf("DNS: Problem adding entry: %s", err)
}
}()
return rec, true
} else {
d.logger.Errorf("DNS: Fallback lookup for sandbox '%s' failed", sandboxID)
}
}
return "", false
}

func (*DNS) hostname(sandboxID string) string {
return fmt.Sprintf("%s.", sandboxID)
func (d *DNS) dnsKeyFor(sandboxID string) string {
return fmt.Sprintf("dns.%s", sandboxID)
}

func (d *DNS) handleDNSRequest(w resolver.ResponseWriter, r *resolver.Msg) {
Expand All @@ -63,8 +99,10 @@ func (d *DNS) handleDNSRequest(w resolver.ResponseWriter, r *resolver.Msg) {
}

sandboxID := strings.Split(q.Name, "-")[0]
ip, found := d.get(sandboxID)
if found {
// Trim trailing period to facilitate key consistency.
sandboxID = strings.TrimSuffix(sandboxID, ".")

if ip, found := d.get(sandboxID); found {
a.A = net.ParseIP(ip).To4()
} else {
a.A = net.ParseIP(defaultRoutingIP).To4()
Expand All @@ -76,7 +114,7 @@ func (d *DNS) handleDNSRequest(w resolver.ResponseWriter, r *resolver.Msg) {

err := w.WriteMsg(m)
if err != nil {
log.Printf("Failed to write message: %s\n", err.Error())
d.logger.Errorf("DNS: Failed to write message: %w", err)
}
}

Expand All @@ -85,11 +123,15 @@ func (d *DNS) Start(address string, port int) error {

mux.HandleFunc(".", d.handleDNSRequest)

server := resolver.Server{Addr: fmt.Sprintf("%s:%d", address, port), Net: "udp", Handler: mux}
server := resolver.Server{
Addr: fmt.Sprintf("%s:%d", address, port),
Net: "udp",
Handler: mux,
}

err := server.ListenAndServe()
if err != nil {
return fmt.Errorf("failed to start DNS server: %w", err)
return fmt.Errorf("DNS: failed to start server: %w", err)
}

return nil
Expand Down
10 changes: 8 additions & 2 deletions packages/api/internal/orchestrator/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ func (o *Orchestrator) getDeleteInstanceFunction(ctx context.Context, posthogCli
node.CPUUsage.Add(-info.VCpu)
node.RamUsage.Add(-info.RamMB)

o.dns.Remove(info.Instance.SandboxID, node.Info.IPAddress)
if err := o.dns.Remove(info.Instance.SandboxID); err != nil {
// n.b. Don't halt execution, just log it and move on.
logger.Error(err)
}
}

req := &orchestrator.SandboxDeleteRequest{SandboxId: info.Instance.SandboxID}
Expand Down Expand Up @@ -179,7 +182,10 @@ func (o *Orchestrator) getInsertInstanceFunction(ctx context.Context, logger *za
node.CPUUsage.Add(info.VCpu)
node.RamUsage.Add(info.RamMB)
jakubno marked this conversation as resolved.
Show resolved Hide resolved

o.dns.Add(info.Instance.SandboxID, node.Info.IPAddress)
if err := o.dns.Add(info.Instance.SandboxID, node.Info.IPAddress); err != nil {
jakubno marked this conversation as resolved.
Show resolved Hide resolved
// n.b. Don't halt execution, just log it and move on.
logger.Error(err)
}
}

_, err := o.analytics.Client.InstanceStarted(ctx, &analyticscollector.InstanceStartedEvent{
Expand Down
48 changes: 30 additions & 18 deletions packages/api/internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package orchestrator
import (
"context"
"errors"
"fmt"
"log"

redis "github.com/go-redis/redis/v8"
nomadapi "github.com/hashicorp/nomad/api"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
Expand Down Expand Up @@ -39,28 +39,12 @@ func New(
logger.Errorf("Error initializing Analytics client\n: %v", err)
}

dnsServer := dns.New()

if env.IsLocal() {
fmt.Printf("Running locally, skipping starting DNS server\n")
} else {
go func() {
fmt.Printf("Starting DNS server\n")

dnsErr := dnsServer.Start("127.0.0.4", 53)
if dnsErr != nil {
log.Fatalf("Failed running DNS server: %v\n", dnsErr)
}
}()
}

o := Orchestrator{
analytics: analyticsInstance,
nomadClient: nomadClient,
logger: logger,
tracer: tracer,
nodes: smap.New[*Node](),
dns: dnsServer,
}

cache := instance.NewCache(
Expand All @@ -72,9 +56,37 @@ func New(

o.instanceCache = cache

rdbOpts := &redis.Options{Addr: "127.0.0.1:6379"}

fallbackResolverFn := func(sandboxID string) (string, bool) {
for _, apiNode := range o.GetNodes() {
if detail := o.GetNodeDetail(apiNode.NodeID); detail != nil {
for _, sb := range detail.Sandboxes {
if sandboxID == sb.SandboxID {
if node := o.GetNode(apiNode.NodeID); node != nil {
return node.Info.IPAddress, true
}
}
}
}
}
return "", false
}

o.dns = dns.New(ctx, rdbOpts, fallbackResolverFn, logger)

if env.IsLocal() {
logger.Info("Skipping syncing sandboxes, running locally")
logger.Info("Running locally, skipping starting DNS server")
logger.Info("Running locally, skipping syncing sandboxes")
} else {
go func() {
logger.Info("Starting DNS server")

if err := o.dns.Start("127.0.0.4", 53); err != nil {
log.Fatalf("Failed running DNS server: %v\n", err)
}
}()

go o.keepInSync(cache)
}

Expand Down
10 changes: 10 additions & 0 deletions packages/nomad/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ resource "nomad_job" "api" {
}
}

resource "nomad_job" "redis" {
jobspec = file("${path.module}/redis.hcl")

hcl2 {
vars = {
gcp_zone = var.gcp_zone
}
}
}

resource "nomad_job" "docker_reverse_proxy" {
jobspec = file("${path.module}/docker-reverse-proxy.hcl")

Expand Down
Loading