Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip #3004

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft

wip #3004

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ crossbuild:
release --snapshot --clean --skip=publish --verbose

build:
go build -o parca-agent -buildvcs=false -ldflags="-extldflags=-static" -tags osusergo,netgo
go build -o parca-agent -buildvcs=false -ldflags="-extldflags=-Wl,-z,lazy" -tags osusergo,netgo

build-debug:
go build -o parca-agent-debug -buildvcs=false -ldflags="-extldflags=-static" -tags osusergo,netgo -gcflags "all=-N -l"
Expand Down
146 changes: 146 additions & 0 deletions arrowmetrics/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package arrowmetrics

import (
"context"
"fmt"
"time"

arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
"github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/ebpf-profiler/libpf"
)

type Producer interface {
Produce(pmetric.MetricSlice) error
}

type ProducerConfig struct {
Producer Producer
ScopeName string
}

type stream struct {
endpoint arrowpb.ArrowMetricsService_ArrowMetricsClient
arrowProducer *arrow_record.Producer
}

type Exporter struct {
client arrowpb.ArrowMetricsServiceClient
// NB: someday we might want to have several producer groups,
// each of which collects at different intervals.
// For now we are only collecting one scope (GPU metrics)
// so the global interval is fine.
interval time.Duration
producers []ProducerConfig
resourceAttrs map[string]any
stream *stream
}

func NewExporter(client arrowpb.ArrowMetricsServiceClient, interval time.Duration, resourceAttrs map[string]any) Exporter {
return Exporter{
client: client,
interval: interval,
resourceAttrs: resourceAttrs,
stream: nil,
}
}

func (e *Exporter) AddProducer(p ProducerConfig) {
e.producers = append(e.producers, p)
}

func (e *Exporter) makeStream(ctx context.Context) error {
log.Debugf("making new stream")
endpoint, err := e.client.ArrowMetrics(ctx)
if err != nil {
return err
}
p := arrow_record.NewProducer()
e.stream = &stream{
endpoint: endpoint,
arrowProducer: p,
}
return nil
}

func (e *Exporter) report(ctx context.Context) error {
m := pmetric.NewMetrics()
r := m.ResourceMetrics().AppendEmpty()
if err := r.Resource().Attributes().FromRaw(e.resourceAttrs); err != nil {
return err
}
for _, p := range e.producers {
log.Debugf("Running arrow metrics producer for scope %s", p.ScopeName)
s := r.ScopeMetrics().AppendEmpty()
s.Scope().SetName(p.ScopeName)
ms := s.Metrics()
if err := p.Producer.Produce(ms); err != nil {
log.Warnf("Producer for scope %s failed to produce metrics: %v", p.ScopeName, err)
}
}

dpc := m.DataPointCount()
log.Debugf("About to report arrow metrics with %d total data points", dpc)

retriesRemaining := 1
var err error
var arrow *arrowpb.BatchArrowRecords
for retriesRemaining >= 0 {
retriesRemaining -= 1
if e.stream == nil {
err = e.makeStream(ctx)
if err != nil {
// if we failed to create a new stream, don't retry.
// The point of the retry loop is to handle the stream
// legitimately going away e.g. due to the server
// having specified max_connection_age,
// not unexpected issues creating a new stream.
break
}
}
arrow, err = e.stream.arrowProducer.BatchArrowRecordsFromMetrics(m)
if err != nil {
log.Debugf("err on produce: %v", err)
e.stream = nil
continue
}
err = e.stream.endpoint.Send(arrow)
if err != nil {
log.Debugf("err on send: %v", err)
e.stream = nil
continue
} else {
log.Debugf("send of %d succeeded", dpc)
}
break
}

if err != nil {
return err
}

return nil
}

func (e *Exporter) Start(ctx context.Context) {
if len(e.producers) == 0 {
return;
}
go func() {
tick := time.NewTicker(e.interval)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case <-tick.C:
if err := e.report(ctx); err != nil {
fmt.Errorf("Failed to send arrow metrics: %v", err)
}
tick.Reset(libpf.AddJitter(e.interval, 0.2))
}
}
}()
}
119 changes: 119 additions & 0 deletions arrowmetrics/nvidia.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package arrowmetrics

import (
"bytes"
"encoding/binary"
"fmt"
"sort"

"github.com/NVIDIA/go-nvml/pkg/nvml"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type perDeviceState struct {
d nvml.Device
lastTimestamp uint64
}

type producer struct {
devices []perDeviceState
}

type byTs []nvml.Sample

func (a byTs) Len() int { return len(a) }
func (a byTs) Less(i, j int) bool { return a[i].TimeStamp < a[j].TimeStamp }
func (a byTs) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

func NewNvidiaProducer() (*producer, error) {
ret := nvml.Init()
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("Failed to initialize NVML library: %v", nvml.ErrorString(ret))
}
count, ret := nvml.DeviceGetCount()
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("Failed to get count of Nvidia devices: %v", nvml.ErrorString(ret))
}
devices := make([]perDeviceState, count)
for i := 0; i < count; i++ {
device, ret := nvml.DeviceGetHandleByIndex(i)
if ret != nvml.SUCCESS {
return nil, fmt.Errorf("Failed to get handle for Nvidia device %d: %v", i, nvml.ErrorString(ret))
}
devices[i] = perDeviceState{
d: device,
lastTimestamp: 0,
}
}
return &producer{
devices: devices,
}, nil
}

func (p *producer) Produce(ms pmetric.MetricSlice) error {
for i, pds := range p.devices {
uuid, ret := pds.d.GetUUID()
if ret != nvml.SUCCESS {
log.Errorf("Failed to get device UUID at index %d: %v", i, nvml.ErrorString(ret))
continue
}
log.Debugf("Collecting metrics for device %s at index %d", uuid, i)

m := ms.AppendEmpty()
g := m.SetEmptyGauge()

valueType, utilSamps, ret := pds.d.GetSamples(nvml.GPU_UTILIZATION_SAMPLES, pds.lastTimestamp)
if ret != nvml.SUCCESS {
log.Errorf("Failed to get GPU utilization for device %s at index %d", uuid, i)
continue
}
var setVal func(pmetric.NumberDataPoint, [8]byte)
switch valueType {
case nvml.VALUE_TYPE_DOUBLE:
setVal = func(dp pmetric.NumberDataPoint, val [8]byte) {
var value float64
// TODO - test this on a big-endian machine
err := binary.Read(bytes.NewReader(val[:]), binary.NativeEndian, &value)
if err != nil {
// justification for panic: this can never happen unless we've made
// a programming error.
panic(err)
}
dp.SetDoubleValue(value)
}
case nvml.VALUE_TYPE_UNSIGNED_INT, nvml.VALUE_TYPE_UNSIGNED_LONG, nvml.VALUE_TYPE_UNSIGNED_LONG_LONG, nvml.VALUE_TYPE_SIGNED_LONG_LONG, nvml.VALUE_TYPE_SIGNED_INT, nvml.VALUE_TYPE_COUNT:
setVal = func(dp pmetric.NumberDataPoint, val [8]byte) {
var value int64
// TODO - test this on a big-endian machine
err := binary.Read(bytes.NewReader(val[:]), binary.NativeEndian, &value)
if err != nil {
// justification for panic: this can never happen unless we've made
// a programming error.
panic(err)
}
dp.SetIntValue(value)
}
default:
log.Errorf("Unknown data type in GPU metrics: %d", valueType)
continue
}

sort.Sort(byTs(utilSamps))

for _, samp := range utilSamps {
pds.lastTimestamp = max(pds.lastTimestamp, samp.TimeStamp)

dp := g.DataPoints().AppendEmpty()
setVal(dp, samp.SampleValue)

// samp.TimeStamp is micros since epoch; pcommon.Timestamp expects
// nanos since epoch
dp.SetTimestamp(pcommon.Timestamp(samp.TimeStamp * 1000))
dp.Attributes().PutStr("UUID", uuid)
dp.Attributes().PutInt("index", int64(i))
}
}
return nil
}
59 changes: 59 additions & 0 deletions arrowmetrics/nvidia_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package arrowmetrics

import (
"math/rand/v2"
"time"

log "github.com/sirupsen/logrus"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/google/uuid"
)

type MockProducer struct {
deviceUuids []string
lastTime time.Time
}

// NewNvidiaMockProducer creates a Producer that generates random data to send.
func NewNvidiaMockProducer(nDevices int, samplesFromTime time.Time) *MockProducer {
deviceUuids := make([]string, 0, nDevices)
for range nDevices {
deviceUuids = append(deviceUuids, uuid.New().String())
}

return &MockProducer{
deviceUuids: deviceUuids,
lastTime: samplesFromTime,
}
}

const PERIOD = time.Second / 6

func (p *MockProducer) Produce(ms pmetric.MetricSlice) error {
for i, uuid := range p.deviceUuids {
log.Debugf("Collecting metrics for device %s at index %d", uuid, i)

m := ms.AppendEmpty()
g := m.SetEmptyGauge()

now := time.Now()
m.SetName("gpu_utilization_percent")

for i, uuid := range p.deviceUuids {
lastTimeRounded := p.lastTime.Truncate(PERIOD).Add(PERIOD)

for lastTimeRounded.Before(now) {
dp := g.DataPoints().AppendEmpty()
dp.SetIntValue(int64(rand.IntN(100)))
dp.SetTimestamp(pcommon.NewTimestampFromTime(lastTimeRounded))
lastTimeRounded = lastTimeRounded.Add(PERIOD)
dp.Attributes().PutStr("UUID", uuid)
dp.Attributes().PutInt("index", int64(i))
}
}
p.lastTime = now
}
return nil
}
11 changes: 10 additions & 1 deletion flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"time"

"github.com/alecthomas/kong"
"go.opentelemetry.io/ebpf-profiler/tracer"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/ebpf-profiler/tracer"
_ "google.golang.org/grpc/encoding/proto"
)

Expand Down Expand Up @@ -113,6 +113,9 @@ type Flags struct {
RubyUnwindingDisable bool `default:"false" help:"[deprecated] Disable Ruby unwinder."`
JavaUnwindingDisable bool `default:"true" help:"[deprecated] Disable Java unwinder."`

// which metrics producers (e.g. nvidia) to enable
MetricsProducer FlagsMetricProducer `embed:"" prefix:"metrics-producer-"`

CollectCustomLabels bool `default:"false" help:"Attempt to collect custom labels (e.g. trace ID) from the process."`

AnalyticsOptOut bool `default:"false" help:"Opt out of sending anonymous usage statistics."`
Expand Down Expand Up @@ -311,6 +314,12 @@ type FlagsDWARFUnwinding struct {
Mixed bool `default:"true" help:"[deprecated] Unwind using .eh_frame information and frame pointers."`
}

// FlagsMetricProducer contains flags that configure arrow metrics production.
type FlagsMetricProducer struct {
NvidiaGpu bool `default:"false" help:"Collect metrics related to Nvidia GPUs."`
NvidiaGpuMock bool `default:"false" help:"Generate fake Nvidia GPU metrics." hidden:""`
}

type FlagsTelemetry struct {
DisablePanicReporting bool `default:"false"`
StderrBufferSizeKb int64 `default:"4096"`
Expand Down
Loading
Loading