Skip to content
This repository has been archived by the owner on Jun 14, 2023. It is now read-only.

Add process metadata file for work with eBPF agent #156

Merged
merged 7 commits into from
Apr 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ Below is the full list of supported environment variables you can set to customi
| `SW_AGENT_COLLECTOR_HEARTBEAT_PERIOD` | Agent heartbeat report period. Unit, second | 20 |
| `SW_AGENT_COLLECTOR_GET_AGENT_DYNAMIC_CONFIG_INTERVAL` | Sniffer get agent dynamic config interval. Unit, second | 20 |
| `SW_AGENT_COLLECTOR_MAX_SEND_QUEUE_SIZE` | Send span queue buffer length | 30000 |
| `SW_AGENT_PROCESS_STATUS_HOOK_ENABLE` | Enable the Process Status Hook feature | false |
| `SW_AGENT_PROCESS_LABELS` | The labels of the process, multiple labels split by "," | unset |


## CDS - Configuration Discovery Service
Expand All @@ -264,5 +266,28 @@ Golang agent supports the following dynamic configurations.
|:-----------------:|:----------------------------------------------------------------------------------------:|:--------------------:|
| agent.sample_rate | The percentage of trace when sampling. It's `[0, 1]`, Same with `WithSampler` parameter. | 0.1 |

## Process Status Hook

This feature is used in cooperation with the [skywalking-rover](https://github.com/apache/skywalking-rover) project.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you missed one, I suggest this feature(temp files) should be optional and OFF on default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is recommended to enable this feature through environment variable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated and let the process label and process status hook enabled could update through the environment variable.


When go2sky keeps alive with the backend, it would write a metadata file to the local (temporary directory) at the same time, which describes the information of the current process.
The rover side scans all processes, find out which process contains this metadata file. Finally, the rover could collect, profiling, with this process.
Comment on lines +273 to +274
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
When go2sky keeps alive with the backend, it would write a metadata file to the local (temporary directory) at the same time, which describes the information of the current process.
The rover side scans all processes, find out which process contains this metadata file. Finally, the rover could collect, profiling, with this process.
When go2sky keeps alive with the backend, it would write a metadata file to the local (temporary directory) at the same time, which provides the information of the current process.
The rover would detect this file in order to identify this Golang service, and tags it `ebpf-profiling-able`.

As a doc, we don't need to describe how codes work, they are open sourced :) We need to tell why it works in this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ebpf-profiling-able? Do you mean to add this tag to the process labels?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean logically, this is what this is designed for. Nothing about label.


### Metadata File

The metadata file use to save metadata with current process, it save in: `{TMPDIR}/apache_skywalking/process/{pid}/metadata.properties`.

Also, when the go2sky keep alive with backend, modify and open time of the metadata file would be updated.

| Key | Type | Description |
|-----|------|------------|
|layer|string|this process layer.|
|service_name|string|this process service name.|
|instance_name|string|this process instance name.|
|process_name|string|this process process name, it's same with the instance name.|
|properties|json|the properties in instance, the process labels also in the properties value.|
|labels|string|the process labels, multiple labels split by ",".|
|language|string|current process language, which is `golang`.|

# License
Apache License 2.0. See [LICENSE](LICENSE) file for details.
2 changes: 2 additions & 0 deletions docs/GRPC-Reporter-Option.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@
| `reporter.WithCDS` | setup CDS service |
| `reporter.WithLayer` | setup layer |
| `reporter.WithFAASLayer` | setup layer to FAAS |
| `reporter.WithProcessLabels` | setup labels bind to process |
| `reporter.WithProcessStatusHook` | setup is enabled the process status |
10 changes: 10 additions & 0 deletions reporter/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ type gRPCReporter struct {

// Instance belong layer name which define in the backend
layer string

// The process metadata and is enabled the process status hook
processLabels []string
processStatusHookEnable bool
}

func (r *gRPCReporter) Boot(service string, serviceInstance string, cdsWatchers []go2sky.AgentConfigChangeWatcher) {
Expand Down Expand Up @@ -202,6 +206,7 @@ func (r *gRPCReporter) Close() {
close(r.sendCh)
} else {
r.closeGRPCConn()
cleanupProcessDirectory(r)
}
}

Expand Down Expand Up @@ -314,6 +319,11 @@ func (r *gRPCReporter) check() {
break
}

// report the process status
if r.processStatusHookEnable {
reportProcess(r)
}

if !instancePropertiesSubmitted {
err := r.reportInstanceProperties()
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions reporter/grpc_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
Expand All @@ -32,6 +33,8 @@ const (
swAgentCollectorGetAgentDynamicConfigInterval = "SW_AGENT_COLLECTOR_GET_AGENT_DYNAMIC_CONFIG_INTERVAL"
swAgentCollectorBackendServices = "SW_AGENT_COLLECTOR_BACKEND_SERVICES"
swAgentCollectorMaxSendQueueSize = "SW_AGENT_COLLECTOR_MAX_SEND_QUEUE_SIZE"
swAgentProcessStatusHookEnable = "SW_AGENT_PROCESS_STATUS_HOOK_ENABLE"
swAgentProcessLabels = "SW_AGENT_PROCESS_LABELS"
wu-sheng marked this conversation as resolved.
Show resolved Hide resolved
)

// serverAddrFormEnv read the backend service address in the environment variable
Expand Down Expand Up @@ -75,5 +78,18 @@ func gRPCReporterOptionsFormEnv() (opts []GRPCReporterOption, err error) {
}
opts = append(opts, WithMaxSendQueueSize(int(size)))
}

if value := os.Getenv(swAgentProcessStatusHookEnable); value != "" {
enable, err1 := strconv.ParseBool(value)
if err1 != nil {
return nil, err
}
opts = append(opts, WithProcessStatusHook(enable))
}

if value := os.Getenv(swAgentProcessLabels); value != "" {
labels := strings.Split(value, ",")
opts = append(opts, WithProcessLabels(labels))
}
return
}
21 changes: 20 additions & 1 deletion reporter/grpc_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package reporter

import (
"log"
"strings"
"time"

"github.com/SkyAPM/go2sky/logger"
Expand Down Expand Up @@ -99,4 +100,22 @@ func WithFAASLayer() GRPCReporterOption {
return func(r *gRPCReporter) {
r.layer = "FAAS"
}
}
}

// WithProcessLabels setup labels bind to process
func WithProcessLabels(labels []string) GRPCReporterOption {
return func(t *gRPCReporter) {
t.processLabels = labels
if t.instanceProps == nil {
t.instanceProps = make(map[string]string)
}
t.instanceProps[ProcessLabelKey] = strings.Join(labels, ",")
}
}

// WithProcessStatusHook setup is enabled the process status
func WithProcessStatusHook(enable bool) GRPCReporterOption {
return func(r *gRPCReporter) {
r.processStatusHookEnable = enable
}
}
170 changes: 170 additions & 0 deletions reporter/process.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
//
// Copyright 2022 SkyAPM org
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package reporter

import (
"encoding/json"
"errors"
"fmt"
"os"
"path"
"strconv"
"strings"
"sync"
"time"

commonv3 "skywalking.apache.org/repo/goapi/collect/common/v3"
)

type ProcessReportStatus int8

const (
ProcessLabelKey = "processLabels"

NotInit ProcessReportStatus = iota
Reported
Closed
)

var process *processStat

type processStat struct {
basePath string
metadataFile string
status ProcessReportStatus
shutdownOnce sync.Once
}

func initProcessStat(r *gRPCReporter) *processStat {
basePath := path.Join(os.TempDir(), "apache_skywalking", "process", strconv.Itoa(os.Getpid()))
metaFilePath := path.Join(basePath, "metadata.properties")

return &processStat{
basePath: basePath,
metadataFile: metaFilePath,
status: NotInit,
}
}

// Report the current process metadata to local file
// using to work with eBPF agent
func reportProcess(r *gRPCReporter) {
if process == nil {
process = initProcessStat(r)
}

if process.status == NotInit {
// create metadata file
if p, err := process.initMetadataFile(r); err != nil {
r.logger.Warnf("process status file init failure: %s, %v", p, err)
} else {
process.status = Reported
}
} else if process.status == Reported {
// keep the metadata file alive(update modify time)
updateTime := time.Now()
if err := os.Chtimes(process.metadataFile, updateTime, updateTime); err != nil {
r.logger.Warnf("keep the process metadata alive failure: %v", err)
}
}
}

func (p *processStat) initMetadataFile(r *gRPCReporter) (string, error) {
// create base directory
basePath := process.basePath
if err := os.RemoveAll(basePath); err != nil && !errors.Is(err, os.ErrNotExist) {
return basePath, err
}
if err := os.MkdirAll(basePath, 0o700); err != nil {
return basePath, err
}

// create and write metadata file
metadataFile := process.metadataFile
if metaFile, err := os.Create(metadataFile); err != nil {
return metadataFile, err
} else {
if content, err := p.buildMetadataContent(r); err != nil {
return metadataFile, err
} else if _, err = metaFile.WriteString(content); err != nil {
return metadataFile, err
}
}
return "", nil
}

func (p *processStat) buildMetadataContent(g *gRPCReporter) (string, error) {
layer := g.layer
if layer == "" {
layer = "GENERAL"
}

propertiesJson, err := p.buildPropertiesJson(g)
if err != nil {
return "", err
}

metadata := map[string]string{
"layer": layer,
"service_name": g.service,
"instance_name": g.serviceInstance,
"process_name": g.serviceInstance, // process name is same with instance name
"properties": propertiesJson,
"labels": strings.Join(g.processLabels, ","),
"language": "golang",
}

result := ""
for k, v := range metadata {
result += fmt.Sprintf("%s=%s\n", k, v)
}
return result, nil
}

func (p *processStat) buildPropertiesJson(g *gRPCReporter) (string, error) {
props := buildOSInfo()
if g.instanceProps != nil {
for k, v := range g.instanceProps {
props = append(props, &commonv3.KeyStringValuePair{
Key: k,
Value: v,
})
}
}

properties := make(map[string]string)
for _, p := range props {
properties[p.Key] = p.Value
}
bytes, err := json.Marshal(properties)
if err != nil {
return "", err
}
return string(bytes), nil
}

func cleanupProcessDirectory(r *gRPCReporter) {
if process == nil {
return
}
process.shutdownOnce.Do(func() {
if err := os.RemoveAll(process.basePath); err != nil && r != nil {
r.logger.Warnf("could delete process: %s, %v", process.basePath, err)
}
process.status = Closed
})
}