Skip to content

Commit

Permalink
Merge pull request #110 from ddosify/develop
Browse files Browse the repository at this point in the history
Nvidia GPU Collector & Bug fixes
  • Loading branch information
fatihbaltaci authored Mar 15, 2024
2 parents f404a23 + 0fcb209 commit 6df792f
Show file tree
Hide file tree
Showing 20 changed files with 1,125 additions and 119 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ WORKDIR /app
COPY . ./
RUN apk update && apk add gcc musl-dev
ARG VERSION
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-X 'github.com/ddosify/alaz/datastore.tag=$VERSION'" -o alaz
RUN GOOS=linux go build -ldflags="-X 'github.com/ddosify/alaz/datastore.tag=$VERSION'" -o alaz

FROM registry.access.redhat.com/ubi9/ubi-minimal:9.3-1552
RUN microdnf update -y && microdnf install procps ca-certificates -y && microdnf clean all
Expand Down
11 changes: 6 additions & 5 deletions Dockerfile.default
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
FROM golang:1.20-alpine as builder
FROM golang:1.20.14-bullseye as builder
WORKDIR /app
COPY . ./
RUN apk update && apk add gcc musl-dev
RUN apt update

ARG VERSION
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-X 'github.com/ddosify/alaz/datastore.tag=$VERSION'" -o alaz
RUN GOOS=linux go build -ldflags="-X 'github.com/ddosify/alaz/datastore.tag=$VERSION'" -o alaz

FROM alpine:3.18.3
RUN apk --no-cache add ca-certificates
FROM debian:12.5-slim
RUN apt-get update && apt-get install -y procps ca-certificates && rm -rf /var/lib/apt/lists/*

COPY --chown=0:0 --from=builder /app/alaz ./bin/
ENTRYPOINT ["alaz"]
25 changes: 16 additions & 9 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"

"golang.org/x/time/rate"
Expand Down Expand Up @@ -79,7 +80,7 @@ type Aggregator struct {
rateLimitMu sync.RWMutex

// Used to find the correct mutex for the pid, some pids can share the same mutex
muIndex int
muIndex atomic.Uint64
muArray []*sync.RWMutex
}

Expand Down Expand Up @@ -185,7 +186,7 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{},
liveProcesses: make(map[uint32]struct{}),
rateLimiters: make(map[uint32]*rate.Limiter),
pgStmts: make(map[string]string),
muIndex: 0,
muIndex: atomic.Uint64{},
muArray: nil,
}

Expand Down Expand Up @@ -227,11 +228,10 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{},
a.muArray = make([]*sync.RWMutex, countMuArray)

// set distinct mutex for every live process
a.muIndex = 0
for pid := range a.liveProcesses {
a.muArray[a.muIndex] = &sync.RWMutex{}
sockMaps[pid].mu = a.muArray[a.muIndex]
a.muIndex++
a.muArray[a.muIndex.Load()] = &sync.RWMutex{}
sockMaps[pid].mu = a.muArray[a.muIndex.Load()]
a.muIndex.Add(1)
a.getAlreadyExistingSockets(pid)
}

Expand Down Expand Up @@ -456,9 +456,16 @@ func (a *Aggregator) processExec(d *proc.ProcEvent) {
a.liveProcesses[d.Pid] = struct{}{}

// create lock on demand
a.muArray[a.muIndex%len(a.muArray)] = &sync.RWMutex{}
a.muIndex++
a.clusterInfo.SocketMaps[d.Pid].mu = a.muArray[a.muIndex%len(a.muArray)]
a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))] = &sync.RWMutex{}
a.muIndex.Add(1)

// if duplicate exec event comes, underlying mutex will be changed
// if first assigned mutex is locked and another exec event comes, mutex will be changed
// and unlock of unlocked mutex now is a possibility
// to avoid this case, if a socket map already has a mutex, don't change it
if a.clusterInfo.SocketMaps[d.Pid].mu == nil {
a.clusterInfo.SocketMaps[d.Pid].mu = a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))]
}
}

func (a *Aggregator) processExit(pid uint32) {
Expand Down
1 change: 1 addition & 0 deletions config/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type PostgresConfig struct {
type BackendDSConfig struct {
Host string
MetricsExport bool
GpuMetricsExport bool
MetricsExportInterval int // in seconds

ReqBufferSize int
Expand Down
222 changes: 157 additions & 65 deletions datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/ddosify/alaz/config"
"github.com/ddosify/alaz/ebpf/l7_req"
"github.com/ddosify/alaz/gpu"
"github.com/ddosify/alaz/log"

"github.com/alecthomas/kingpin/v2"
Expand Down Expand Up @@ -129,6 +130,7 @@ func getCloudProvider() CloudProvider {

var resourceBatchSize int64 = 50
var innerMetricsPort int = 8182
var innerGpuMetricsPort int = 8183

// BackendDS is a backend datastore
type BackendDS struct {
Expand Down Expand Up @@ -309,77 +311,59 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
go ds.sendEventsInBatch(ds.containerEventChan, containerEndpoint, eventsInterval)
go ds.sendEventsInBatch(ds.dsEventChan, dsEndpoint, eventsInterval)

if conf.MetricsExport {
go ds.exportNodeMetrics()

go func() {
t := time.NewTicker(time.Duration(conf.MetricsExportInterval) * time.Second)
for {
select {
case <-ds.ctx.Done():
return
case <-t.C:
// make a request to /inner/metrics
// forward the response to /github.com/ddosify/alaz/metrics
func() {
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://localhost:%d/inner/metrics", innerMetricsPort), nil)
if err != nil {
log.Logger.Error().Msgf("error creating inner metrics request: %v", err)
return
}

ctx, cancel := context.WithTimeout(ds.ctx, 5*time.Second)
defer cancel()

// use the default client, ds client reads response on success to look for failed events,
// therefore body here will be empty
resp, err := http.DefaultClient.Do(req.WithContext(ctx))

if err != nil {
log.Logger.Error().Msgf("error sending inner metrics request: %v", err)
return
}

if resp.StatusCode != http.StatusOK {
log.Logger.Error().Msgf("inner metrics request not success: %d", resp.StatusCode)
return
}

req, err = http.NewRequest(http.MethodPost, fmt.Sprintf("%s/metrics/scrape/?instance=%s&monitoring_id=%s", ds.host, NodeID, MonitoringID), resp.Body)
if err != nil {
log.Logger.Error().Msgf("error creating metrics request: %v", err)
return
}

ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

resp, err = ds.c.Do(req.WithContext(ctx))
// send node-exporter and nvidia-gpu metrics
go func() {
if !(conf.MetricsExport || conf.GpuMetricsExport) {
return
}

if err != nil {
log.Logger.Error().Msgf("error sending metrics request: %v", err)
return
}
var nodeMetrics, gpuMetrics bool
if conf.MetricsExport {
go ds.exportNodeMetrics()
nodeMetrics = true // by default
}

if resp.StatusCode != http.StatusOK {
log.Logger.Error().Msgf("metrics request not success: %d", resp.StatusCode)
if conf.GpuMetricsExport {
err := ds.exportGpuMetrics()
if err != nil {
log.Logger.Error().Msgf("error exporting gpu metrics: %v", err)
} else {
gpuMetrics = true
}
}

// log response body
rb, err := io.ReadAll(resp.Body)
if err != nil {
log.Logger.Error().Msgf("error reading metrics response body: %v", err)
}
log.Logger.Error().Msgf("metrics response body: %s", string(rb))
t := time.NewTicker(time.Duration(conf.MetricsExportInterval) * time.Second)
for {
select {
case <-ds.ctx.Done():
return
case <-t.C:
payloads := []io.Reader{}
if nodeMetrics {
nodeMetrics, err := ds.scrapeNodeMetrics()
if err != nil {
log.Logger.Error().Msgf("error scraping node metrics: %v", err)
} else {
log.Logger.Debug().Msg("node-metrics scraped successfully")
payloads = append(payloads, nodeMetrics)
}
}
if gpuMetrics {
gpuMetrics, err := ds.scrapeGpuMetrics()
if err != nil {
log.Logger.Error().Msgf("error scraping gpu metrics: %v", err)
} else {
log.Logger.Debug().Msg("gpu-metrics scraped successfully")
payloads = append(payloads, gpuMetrics)
}
}

return
} else {
log.Logger.Debug().Msg("metrics sent successfully")
}
}()
if len(payloads) > 0 {
ds.sendMetricsToBackend(io.MultiReader(payloads...))
}
}
}()
}
}
}()

go func() {
<-ds.ctx.Done()
Expand Down Expand Up @@ -479,6 +463,39 @@ func convertTraceEventsToPayload(batch []*TraceInfo) TracePayload {
}
}

func (b *BackendDS) sendMetricsToBackend(r io.Reader) {
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/metrics/scrape/?instance=%s&monitoring_id=%s", b.host, NodeID, MonitoringID), r)
if err != nil {
log.Logger.Error().Msgf("error creating metrics request: %v", err)
return
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

resp, err := b.c.Do(req.WithContext(ctx))

if err != nil {
log.Logger.Error().Msgf("error sending metrics request: %v", err)
return
}

if resp.StatusCode != http.StatusOK {
log.Logger.Error().Msgf("metrics request not success: %d", resp.StatusCode)

// log response body
rb, err := io.ReadAll(resp.Body)
if err != nil {
log.Logger.Error().Msgf("error reading metrics response body: %v", err)
}
log.Logger.Error().Msgf("metrics response body: %s", string(rb))

return
} else {
log.Logger.Debug().Msg("metrics sent successfully")
}
}

func (b *BackendDS) sendToBackend(method string, payload interface{}, endpoint string) {
payloadBytes, err := json.Marshal(payload)
if err != nil {
Expand Down Expand Up @@ -845,6 +862,63 @@ func (b *BackendDS) SendHealthCheck(ebpf bool, metrics bool, dist bool, k8sVersi
}
}

func (b *BackendDS) scrapeNodeMetrics() (io.Reader, error) {
// get node metrics from node-exporter
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://localhost:%d/inner/metrics", innerMetricsPort), nil)
if err != nil {
return nil, fmt.Errorf("error creating inner metrics request: %v", err)
}

ctx, cancel := context.WithTimeout(b.ctx, 5*time.Second)
// defer cancel()
// do not defer cancel here, since we return the reader to the caller on success
// if deferred, there will be a race condition between the caller and the defer

// use the default client, ds client reads response on success to look for failed events,
// therefore body here will be empty
resp, err := http.DefaultClient.Do(req.WithContext(ctx))

if err != nil {
cancel()
return nil, fmt.Errorf("error sending inner metrics request: %v", err)
}

if resp.StatusCode != http.StatusOK {
cancel()
return nil, fmt.Errorf("inner metrics request not success: %d", resp.StatusCode)
}

return resp.Body, nil
}

func (b *BackendDS) scrapeGpuMetrics() (io.Reader, error) {
// get gpu metrics from nvml
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("http://localhost:%d/inner/gpu-metrics", innerGpuMetricsPort), nil)
if err != nil {
return nil, fmt.Errorf("error creating inner gpu metrics request: %v", err)
}

ctx, cancel := context.WithTimeout(b.ctx, 5*time.Second)
// defer cancel()
// do not defer cancel here, since we return the reader to the caller on success
// if deferred, there will be a race condition between the caller and the defer

// use the default client, ds client reads response on success to look for failed events,
// therefore body here will be empty
resp, err := http.DefaultClient.Do(req.WithContext(ctx))

if err != nil {
cancel()
return nil, fmt.Errorf("error sending gpu inner metrics request: %v", err)
}

if resp.StatusCode != http.StatusOK {
cancel()
return nil, fmt.Errorf("gpu inner metrics request not success: %d", resp.StatusCode)
}
return resp.Body, nil
}

func (b *BackendDS) exportNodeMetrics() {
kingpin.Version(version.Print("alaz_node_exporter"))
kingpin.CommandLine.UsageWriter(os.Stdout)
Expand All @@ -857,6 +931,24 @@ func (b *BackendDS) exportNodeMetrics() {
http.ListenAndServe(fmt.Sprintf(":%d", innerMetricsPort), nil)
}

func (b *BackendDS) exportGpuMetrics() error {
gpuMetricsPath := "/inner/gpu-metrics"
gpuCollector, err := gpu.NewGpuCollector()
if err != nil {
log.Logger.Error().Msgf("error creating gpu collector: %v", err)
return err
}

r := prometheus.NewRegistry()
r.MustRegister(gpuCollector)
r.MustRegister(version.NewCollector("alaz_nvidia_gpu_exporter"))

http.Handle(gpuMetricsPath, promhttp.HandlerFor(r, promhttp.HandlerOpts{}))
log.Logger.Info().Msgf("exporting gpu metrics at %s, on port %d", gpuMetricsPath, innerGpuMetricsPort)
go http.ListenAndServe(fmt.Sprintf(":%d", innerGpuMetricsPort), nil)
return nil
}

type nodeExporterHandler struct {
inner http.Handler
logger nodeExportLogger
Expand Down
3 changes: 3 additions & 0 deletions ebpf/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (e *EbpfCollector) close() {
e.probesMu.Lock()
defer e.probesMu.Unlock()

log.Logger.Info().Msg("closing ebpf uprobes")

for pid := range e.sslWriteUprobes {
e.sslWriteUprobes[pid].Close()
}
Expand All @@ -163,6 +165,7 @@ func (e *EbpfCollector) close() {
l.Close()
}
}
log.Logger.Info().Msg("ebpf collector closed")
}

// in order to prevent the memory peak at the beginning
Expand Down
1 change: 1 addition & 0 deletions ebpf/l7_req/amqp.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build ignore
// AMQP is a binary protocol. Information is organised into "frames", of various types. Frames carry
// protocol methods and other information. All frames have the same general format:
// frame header, payload and frame end. The frame payload format depends on the frame type.
Expand Down
1 change: 1 addition & 0 deletions ebpf/l7_req/http.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build ignore
#define METHOD_UNKNOWN 0
#define METHOD_GET 1
#define METHOD_POST 2
Expand Down
1 change: 1 addition & 0 deletions ebpf/l7_req/http2.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build ignore
#define CLIENT_FRAME 1
#define SERVER_FRAME 2

Expand Down
Loading

0 comments on commit 6df792f

Please sign in to comment.