Skip to content

Commit

Permalink
Rename process group to process details (#2032)
Browse files Browse the repository at this point in the history
  • Loading branch information
RonFed authored Dec 20, 2024
1 parent 221efbd commit 9cda745
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 87 deletions.
74 changes: 37 additions & 37 deletions instrumentation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ var (
// The manager will apply the configuration to all instrumentations that match the config group.
type ConfigUpdate[configGroup ConfigGroup] map[configGroup]Config

type instrumentationDetails[processGroup ProcessGroup, configGroup ConfigGroup] struct {
type instrumentationDetails[processDetails ProcessDetails, configGroup ConfigGroup] struct {
// we want to track the instrumentation even if it failed to load, to be able to report the error
// and clean up the reporter resources once the process exits.
// hence, this might be nil if the instrumentation failed to load.
inst Instrumentation
pg processGroup
pd processDetails
cg configGroup
}

type ManagerOptions[processGroup ProcessGroup, configGroup ConfigGroup] struct {
type ManagerOptions[processDetails ProcessDetails, configGroup ConfigGroup] struct {
Logger logr.Logger

// Factories is a map of OTel distributions to their corresponding instrumentation factories.
Expand All @@ -46,7 +46,7 @@ type ManagerOptions[processGroup ProcessGroup, configGroup ConfigGroup] struct {
// based on the process event.
//
// The handler is also used to report the instrumentation lifecycle events.
Handler *Handler[processGroup, configGroup]
Handler *Handler[processDetails, configGroup]

// DetectorOptions is a list of options to configure the process detector.
//
Expand All @@ -69,27 +69,27 @@ type Manager interface {
Run(ctx context.Context) error
}

type manager[processGroup ProcessGroup, configGroup ConfigGroup] struct {
type manager[processDetails ProcessDetails, configGroup ConfigGroup] struct {
// channel for receiving process events,
// used to detect new processes and process exits, and handle their instrumentation accordingly.
procEvents <-chan detector.ProcessEvent
detector detector.Detector
handler *Handler[processGroup, configGroup]
handler *Handler[processDetails, configGroup]
factories map[OtelDistribution]Factory
logger logr.Logger

// all the created instrumentations by pid,
// this map is not concurrent safe, so it should be accessed only from the main event loop
detailsByPid map[int]*instrumentationDetails[processGroup, configGroup]
detailsByPid map[int]*instrumentationDetails[processDetails, configGroup]

// instrumentations by workload, and aggregated by pid
// this map is not concurrent safe, so it should be accessed only from the main event loop
detailsByWorkload map[configGroup]map[int]*instrumentationDetails[processGroup, configGroup]
detailsByWorkload map[configGroup]map[int]*instrumentationDetails[processDetails, configGroup]

configUpdates <-chan ConfigUpdate[configGroup]
}

func NewManager[processGroup ProcessGroup, configGroup ConfigGroup](options ManagerOptions[processGroup, configGroup]) (Manager, error) {
func NewManager[processDetails ProcessDetails, configGroup ConfigGroup](options ManagerOptions[processDetails, configGroup]) (Manager, error) {
handler := options.Handler
if handler == nil {
return nil, errors.New("handler is required for ebpf instrumentation manager")
Expand All @@ -99,7 +99,7 @@ func NewManager[processGroup ProcessGroup, configGroup ConfigGroup](options Mana
return nil, errors.New("reporter is required for ebpf instrumentation manager")
}

if handler.ProcessGroupResolver == nil {
if handler.ProcessDetailsResolver == nil {
return nil, errors.New("details resolver is required for ebpf instrumentation manager")
}

Expand All @@ -126,19 +126,19 @@ func NewManager[processGroup ProcessGroup, configGroup ConfigGroup](options Mana
return nil, fmt.Errorf("failed to create process detector: %w", err)
}

return &manager[processGroup, configGroup]{
return &manager[processDetails, configGroup]{
procEvents: procEvents,
detector: detector,
handler: handler,
factories: options.Factories,
logger: logger.WithName("ebpf-instrumentation-manager"),
detailsByPid: make(map[int]*instrumentationDetails[processGroup, configGroup]),
detailsByWorkload: map[configGroup]map[int]*instrumentationDetails[processGroup, configGroup]{},
detailsByPid: make(map[int]*instrumentationDetails[processDetails, configGroup]),
detailsByWorkload: map[configGroup]map[int]*instrumentationDetails[processDetails, configGroup]{},
configUpdates: options.ConfigUpdates,
}, nil
}

func (m *manager[ProcessGroup, ConfigGroup]) runEventLoop(ctx context.Context) {
func (m *manager[ProcessDetails, ConfigGroup]) runEventLoop(ctx context.Context) {
// main event loop for handling instrumentations
for {
select {
Expand Down Expand Up @@ -182,7 +182,7 @@ func (m *manager[ProcessGroup, ConfigGroup]) runEventLoop(ctx context.Context) {
}
}

func (m *manager[ProcessGroup, ConfigGroup]) Run(ctx context.Context) error {
func (m *manager[ProcessDetails, ConfigGroup]) Run(ctx context.Context) error {
g, errCtx := errgroup.WithContext(ctx)

g.Go(func() error {
Expand All @@ -198,14 +198,14 @@ func (m *manager[ProcessGroup, ConfigGroup]) Run(ctx context.Context) error {
return err
}

func (m *manager[ProcessGroup, ConfigGroup]) cleanInstrumentation(ctx context.Context, pid int) {
func (m *manager[ProcessDetails, ConfigGroup]) cleanInstrumentation(ctx context.Context, pid int) {
details, found := m.detailsByPid[pid]
if !found {
m.logger.V(3).Info("no instrumentation found for exiting pid, nothing to clean", "pid", pid)
return
}

m.logger.Info("cleaning instrumentation resources", "pid", pid, "process group details", details.pg)
m.logger.Info("cleaning instrumentation resources", "pid", pid, "process group details", details.pd)

if details.inst != nil {
err := details.inst.Close(ctx)
Expand All @@ -214,15 +214,15 @@ func (m *manager[ProcessGroup, ConfigGroup]) cleanInstrumentation(ctx context.Co
}
}

err := m.handler.Reporter.OnExit(ctx, pid, details.pg)
err := m.handler.Reporter.OnExit(ctx, pid, details.pd)
if err != nil {
m.logger.Error(err, "failed to report instrumentation exit")
}

m.stopTrackInstrumentation(pid)
}

func (m *manager[ProcessGroup, ConfigGroup]) handleProcessExecEvent(ctx context.Context, e detector.ProcessEvent) error {
func (m *manager[ProcessDetails, ConfigGroup]) handleProcessExecEvent(ctx context.Context, e detector.ProcessEvent) error {
if details, found := m.detailsByPid[e.PID]; found && details.inst != nil {
// this can happen if we have multiple exec events for the same pid (chain loading)
// TODO: better handle this?
Expand All @@ -232,17 +232,17 @@ func (m *manager[ProcessGroup, ConfigGroup]) handleProcessExecEvent(ctx context.
return nil
}

pg, err := m.handler.ProcessGroupResolver.Resolve(ctx, e)
pd, err := m.handler.ProcessDetailsResolver.Resolve(ctx, e)
if err != nil {
return errors.Join(err, errFailedToGetDetails)
}

otelDisto, err := m.handler.DistributionMatcher.Distribution(ctx, pg)
otelDisto, err := m.handler.DistributionMatcher.Distribution(ctx, pd)
if err != nil {
return errors.Join(err, errFailedToGetDistribution)
}

configGroup, err := m.handler.ConfigGroupResolver.Resolve(ctx, pg, otelDisto)
configGroup, err := m.handler.ConfigGroupResolver.Resolve(ctx, pd, otelDisto)
if err != nil {
return errors.Join(err, errFailedToGetConfigGroup)
}
Expand All @@ -253,7 +253,7 @@ func (m *manager[ProcessGroup, ConfigGroup]) handleProcessExecEvent(ctx context.
}

// Fetch initial settings for the instrumentation
settings, err := m.handler.SettingsGetter.Settings(ctx, pg, otelDisto)
settings, err := m.handler.SettingsGetter.Settings(ctx, pd, otelDisto)
if err != nil {
// for k8s instrumentation config CR will be queried to get the settings
// we should always have config for this event.
Expand All @@ -269,35 +269,35 @@ func (m *manager[ProcessGroup, ConfigGroup]) handleProcessExecEvent(ctx context.
inst, err := factory.CreateInstrumentation(ctx, e.PID, settings)
if err != nil {
m.logger.Error(err, "failed to initialize instrumentation", "language", otelDisto.Language, "sdk", otelDisto.OtelSdk)
err = m.handler.Reporter.OnInit(ctx, e.PID, err, pg)
err = m.handler.Reporter.OnInit(ctx, e.PID, err, pd)
// TODO: should we return here the initialize error? or the handler error? or both?
return err
}

loadErr := inst.Load(ctx)

reporterErr := m.handler.Reporter.OnLoad(ctx, e.PID, loadErr, pg)
reporterErr := m.handler.Reporter.OnLoad(ctx, e.PID, loadErr, pd)
if reporterErr != nil {
m.logger.Error(reporterErr, "failed to report instrumentation load", "loaded", loadErr == nil, "pid", e.PID, "process group details", pg)
m.logger.Error(reporterErr, "failed to report instrumentation load", "loaded", loadErr == nil, "pid", e.PID, "process group details", pd)
}
if loadErr != nil {
// we need to track the instrumentation even if the load failed.
// consider a reporter which writes a persistent record for a failed/successful load
// we need to notify the reporter once that PID exits to clean up the resources - hence we track it.
// saving the inst as nil marking the instrumentation failed to load, and is not valid to run/configure/close.
m.startTrackInstrumentation(e.PID, nil, pg, configGroup)
m.startTrackInstrumentation(e.PID, nil, pd, configGroup)
m.logger.Error(err, "failed to load instrumentation", "language", otelDisto.Language, "sdk", otelDisto.OtelSdk)
// TODO: should we return here the load error? or the instance write error? or both?
return err
}

m.startTrackInstrumentation(e.PID, inst, pg, configGroup)
m.logger.Info("instrumentation loaded", "pid", e.PID, "process group details", pg)
m.startTrackInstrumentation(e.PID, inst, pd, configGroup)
m.logger.Info("instrumentation loaded", "pid", e.PID, "process group details", pd)

go func() {
err := inst.Run(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
reporterErr := m.handler.Reporter.OnRun(ctx, e.PID, err, pg)
reporterErr := m.handler.Reporter.OnRun(ctx, e.PID, err, pd)
if reporterErr != nil {
m.logger.Error(reporterErr, "failed to report instrumentation run")
}
Expand All @@ -308,23 +308,23 @@ func (m *manager[ProcessGroup, ConfigGroup]) handleProcessExecEvent(ctx context.
return nil
}

func (m *manager[ProcessGroup, ConfigGroup]) startTrackInstrumentation(pid int, inst Instrumentation, processGroup ProcessGroup, configGroup ConfigGroup) {
instDetails := &instrumentationDetails[ProcessGroup, ConfigGroup]{
func (m *manager[ProcessDetails, ConfigGroup]) startTrackInstrumentation(pid int, inst Instrumentation, processDetails ProcessDetails, configGroup ConfigGroup) {
instDetails := &instrumentationDetails[ProcessDetails, ConfigGroup]{
inst: inst,
pg: processGroup,
pd: processDetails,
cg: configGroup,
}
m.detailsByPid[pid] = instDetails

if _, found := m.detailsByWorkload[configGroup]; !found {
// first instrumentation for this workload
m.detailsByWorkload[configGroup] = map[int]*instrumentationDetails[ProcessGroup, ConfigGroup]{pid: instDetails}
m.detailsByWorkload[configGroup] = map[int]*instrumentationDetails[ProcessDetails, ConfigGroup]{pid: instDetails}
} else {
m.detailsByWorkload[configGroup][pid] = instDetails
}
}

func (m *manager[ProcessGroup, ConfigGroup]) stopTrackInstrumentation(pid int) {
func (m *manager[ProcessDetails, ConfigGroup]) stopTrackInstrumentation(pid int) {
details, ok := m.detailsByPid[pid]
if !ok {
return
Expand All @@ -339,7 +339,7 @@ func (m *manager[ProcessGroup, ConfigGroup]) stopTrackInstrumentation(pid int) {
}
}

func (m *manager[ProcessGroup, ConfigGroup]) applyInstrumentationConfigurationForSDK(ctx context.Context, configGroup ConfigGroup, config Config) error {
func (m *manager[ProcessDetails, ConfigGroup]) applyInstrumentationConfigurationForSDK(ctx context.Context, configGroup ConfigGroup, config Config) error {
var err error

configGroupInstrumentations, ok := m.detailsByWorkload[configGroup]
Expand All @@ -351,7 +351,7 @@ func (m *manager[ProcessGroup, ConfigGroup]) applyInstrumentationConfigurationFo
if instDetails.inst == nil {
continue
}
m.logger.Info("applying configuration to instrumentation", "process group details", instDetails.pg, "configGroup", configGroup)
m.logger.Info("applying configuration to instrumentation", "process group details", instDetails.pd, "configGroup", configGroup)
applyErr := instDetails.inst.ApplyConfig(ctx, config)
err = errors.Join(err, applyErr)
}
Expand Down
49 changes: 27 additions & 22 deletions instrumentation/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ type OtelDistribution struct {
OtelSdk common.OtelSdk
}

// ProcessGroup represents a group of processes that are managed together by the hosting platform.
// ProcessDetails is used to convert the common process details reported by the detector to details relevant to hosting platform.
//
// ProcessDetails can contain details that associates a process to a group of processes that are managed together by the hosting platform.
// It may include different information depending on the platform (Kubernetes, VM, etc).
//
// For example consider an app which is launched by a bash script, the script launches a python process.
// The process may create different child processes, and the bash script may launch multiple python processes.
// In this case, the process group may include the bash script, the python process, and the child processes.
type ProcessGroup interface {
//
// Another category of information that may be included relates to language and runtime information which can be used to
// determine the OTel distribution to use.
type ProcessDetails interface {
fmt.Stringer
}

Expand All @@ -34,60 +39,60 @@ type ConfigGroup interface {
comparable
}

// ProcessGroupResolver is used to resolve the process group of a process.
type ProcessGroupResolver[processGroup ProcessGroup] interface {
// ProcessDetailsResolver is used to resolve the process group of a process.
type ProcessDetailsResolver[processDetails ProcessDetails] interface {
// Resolve will classify the process into a process group.
// Those process group details may be used for future calls when reporting the status of the instrumentation.
// or for resolving the configuration group of the process.
Resolve(context.Context, detector.ProcessEvent) (processGroup, error)
Resolve(context.Context, detector.ProcessEvent) (processDetails, error)
}

// ConfigGroupResolver is used to resolve the configuration group of a process.
type ConfigGroupResolver[processGroup ProcessGroup, configGroup ConfigGroup] interface {
type ConfigGroupResolver[processDetails ProcessDetails, configGroup ConfigGroup] interface {
// Resolve will classify the process into a configuration group.
// The Otel Distribution is resolved in the time of calling this function, and may be used
// to determine the configuration group.
Resolve(context.Context, processGroup, OtelDistribution) (configGroup, error)
Resolve(context.Context, processDetails, OtelDistribution) (configGroup, error)
}

// Reporter is used to report the status of the instrumentation.
// It is called at different stages of the instrumentation lifecycle.
type Reporter[processGroup ProcessGroup] interface {
type Reporter[processDetails ProcessDetails] interface {
// OnInit is called when the instrumentation is initialized.
// The error parameter will be nil if the instrumentation was initialized successfully.
OnInit(ctx context.Context, pid int, err error, pg processGroup) error
OnInit(ctx context.Context, pid int, err error, pg processDetails) error

// OnLoad is called after an instrumentation is loaded successfully or failed to load.
// The error parameter will be nil if the instrumentation was loaded successfully.
OnLoad(ctx context.Context, pid int, err error, pg processGroup) error
OnLoad(ctx context.Context, pid int, err error, pg processDetails) error

// OnRun is called after the instrumentation stops running.
// An error may report a fatal error during the instrumentation run, or a closing error
// which happened during the closing of the instrumentation.
OnRun(ctx context.Context, pid int, err error, pg processGroup) error
OnRun(ctx context.Context, pid int, err error, pg processDetails) error

// OnExit is called when the instrumented process exits, and the instrumentation has already been stopped.
// For a reported which persists the instrumentation state, this is the time to clean up the state.
OnExit(ctx context.Context, pid int, pg processGroup) error
OnExit(ctx context.Context, pid int, pg processDetails) error
}

// DistributionMatcher is used to match a process to an Otel Distribution.
type DistributionMatcher[processGroup ProcessGroup] interface {
type DistributionMatcher[processDetails ProcessDetails] interface {
// Distribution will match a process to an Otel Distribution.
Distribution(context.Context, processGroup) (OtelDistribution, error)
Distribution(context.Context, processDetails) (OtelDistribution, error)
}

// SettingsGetter is used to fetch the initial settings of an instrumentation.
type SettingsGetter[processGroup ProcessGroup] interface {
type SettingsGetter[processDetails ProcessDetails] interface {
// GetSettings will fetch the initial settings of an instrumentation.
Settings(context.Context, processGroup, OtelDistribution) (Settings, error)
Settings(context.Context, processDetails, OtelDistribution) (Settings, error)
}

// Handler is used to classify, report and configure instrumentations.
type Handler[processGroup ProcessGroup, configGroup comparable] struct {
ProcessGroupResolver ProcessGroupResolver[processGroup]
ConfigGroupResolver ConfigGroupResolver[processGroup, configGroup]
Reporter Reporter[processGroup]
DistributionMatcher DistributionMatcher[processGroup]
SettingsGetter SettingsGetter[processGroup]
type Handler[processDetails ProcessDetails, configGroup comparable] struct {
ProcessDetailsResolver ProcessDetailsResolver[processDetails]
ConfigGroupResolver ConfigGroupResolver[processDetails, configGroup]
Reporter Reporter[processDetails]
DistributionMatcher DistributionMatcher[processDetails]
SettingsGetter SettingsGetter[processDetails]
}
Loading

0 comments on commit 9cda745

Please sign in to comment.