diff --git a/README.md b/README.md index 80d8e25..766c7e1 100644 --- a/README.md +++ b/README.md @@ -251,6 +251,8 @@ Below is the full list of supported environment variables you can set to customi | `SW_AGENT_COLLECTOR_HEARTBEAT_PERIOD` | Agent heartbeat report period. Unit, second | 20 | | `SW_AGENT_COLLECTOR_GET_AGENT_DYNAMIC_CONFIG_INTERVAL` | Sniffer get agent dynamic config interval. Unit, second | 20 | | `SW_AGENT_COLLECTOR_MAX_SEND_QUEUE_SIZE` | Send span queue buffer length | 30000 | +| `SW_AGENT_PROCESS_STATUS_HOOK_ENABLE` | Enable the Process Status Hook feature | false | +| `SW_AGENT_PROCESS_LABELS` | The labels of the process, multiple labels split by "," | unset | ## CDS - Configuration Discovery Service @@ -264,5 +266,28 @@ Golang agent supports the following dynamic configurations. |:-----------------:|:----------------------------------------------------------------------------------------:|:--------------------:| | agent.sample_rate | The percentage of trace when sampling. It's `[0, 1]`, Same with `WithSampler` parameter. | 0.1 | +## Process Status Hook + +This feature is used in cooperation with the [skywalking-rover](https://github.com/apache/skywalking-rover) project. + +When go2sky keeps alive with the backend, it would write a metadata file to the local (temporary directory) at the same time, which describes the information of the current process. +The rover side scans all processes, find out which process contains this metadata file. Finally, the rover could collect, profiling, with this process. + +### Metadata File + +The metadata file use to save metadata with current process, it save in: `{TMPDIR}/apache_skywalking/process/{pid}/metadata.properties`. + +Also, when the go2sky keep alive with backend, modify and open time of the metadata file would be updated. + +| Key | Type | Description | +|-----|------|------------| +|layer|string|this process layer.| +|service_name|string|this process service name.| +|instance_name|string|this process instance name.| +|process_name|string|this process process name, it's same with the instance name.| +|properties|json|the properties in instance, the process labels also in the properties value.| +|labels|string|the process labels, multiple labels split by ",".| +|language|string|current process language, which is `golang`.| + # License Apache License 2.0. See [LICENSE](LICENSE) file for details. \ No newline at end of file diff --git a/docs/GRPC-Reporter-Option.md b/docs/GRPC-Reporter-Option.md index fc7afbd..1edb915 100644 --- a/docs/GRPC-Reporter-Option.md +++ b/docs/GRPC-Reporter-Option.md @@ -13,3 +13,5 @@ | `reporter.WithCDS` | setup CDS service | | `reporter.WithLayer` | setup layer | | `reporter.WithFAASLayer` | setup layer to FAAS | +| `reporter.WithProcessLabels` | setup labels bind to process | +| `reporter.WithProcessStatusHook` | setup is enabled the process status | diff --git a/reporter/grpc.go b/reporter/grpc.go index c950b0e..9b28b6b 100644 --- a/reporter/grpc.go +++ b/reporter/grpc.go @@ -116,6 +116,10 @@ type gRPCReporter struct { // Instance belong layer name which define in the backend layer string + + // The process metadata and is enabled the process status hook + processLabels []string + processStatusHookEnable bool } func (r *gRPCReporter) Boot(service string, serviceInstance string, cdsWatchers []go2sky.AgentConfigChangeWatcher) { @@ -202,6 +206,7 @@ func (r *gRPCReporter) Close() { close(r.sendCh) } else { r.closeGRPCConn() + cleanupProcessDirectory(r) } } @@ -314,6 +319,11 @@ func (r *gRPCReporter) check() { break } + // report the process status + if r.processStatusHookEnable { + reportProcess(r) + } + if !instancePropertiesSubmitted { err := r.reportInstanceProperties() if err != nil { diff --git a/reporter/grpc_env.go b/reporter/grpc_env.go index c7ca873..b124657 100644 --- a/reporter/grpc_env.go +++ b/reporter/grpc_env.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "strconv" + "strings" "time" "github.com/pkg/errors" @@ -32,6 +33,8 @@ const ( swAgentCollectorGetAgentDynamicConfigInterval = "SW_AGENT_COLLECTOR_GET_AGENT_DYNAMIC_CONFIG_INTERVAL" swAgentCollectorBackendServices = "SW_AGENT_COLLECTOR_BACKEND_SERVICES" swAgentCollectorMaxSendQueueSize = "SW_AGENT_COLLECTOR_MAX_SEND_QUEUE_SIZE" + swAgentProcessStatusHookEnable = "SW_AGENT_PROCESS_STATUS_HOOK_ENABLE" + swAgentProcessLabels = "SW_AGENT_PROCESS_LABELS" ) // serverAddrFormEnv read the backend service address in the environment variable @@ -75,5 +78,18 @@ func gRPCReporterOptionsFormEnv() (opts []GRPCReporterOption, err error) { } opts = append(opts, WithMaxSendQueueSize(int(size))) } + + if value := os.Getenv(swAgentProcessStatusHookEnable); value != "" { + enable, err1 := strconv.ParseBool(value) + if err1 != nil { + return nil, err + } + opts = append(opts, WithProcessStatusHook(enable)) + } + + if value := os.Getenv(swAgentProcessLabels); value != "" { + labels := strings.Split(value, ",") + opts = append(opts, WithProcessLabels(labels)) + } return } diff --git a/reporter/grpc_opts.go b/reporter/grpc_opts.go index 4b09d14..6ad879a 100644 --- a/reporter/grpc_opts.go +++ b/reporter/grpc_opts.go @@ -18,6 +18,7 @@ package reporter import ( "log" + "strings" "time" "github.com/SkyAPM/go2sky/logger" @@ -99,4 +100,22 @@ func WithFAASLayer() GRPCReporterOption { return func(r *gRPCReporter) { r.layer = "FAAS" } -} \ No newline at end of file +} + +// WithProcessLabels setup labels bind to process +func WithProcessLabels(labels []string) GRPCReporterOption { + return func(t *gRPCReporter) { + t.processLabels = labels + if t.instanceProps == nil { + t.instanceProps = make(map[string]string) + } + t.instanceProps[ProcessLabelKey] = strings.Join(labels, ",") + } +} + +// WithProcessStatusHook setup is enabled the process status +func WithProcessStatusHook(enable bool) GRPCReporterOption { + return func(r *gRPCReporter) { + r.processStatusHookEnable = enable + } +} diff --git a/reporter/process.go b/reporter/process.go new file mode 100644 index 0000000..e1b8cff --- /dev/null +++ b/reporter/process.go @@ -0,0 +1,170 @@ +// +// Copyright 2022 SkyAPM org +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package reporter + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "path" + "strconv" + "strings" + "sync" + "time" + + commonv3 "skywalking.apache.org/repo/goapi/collect/common/v3" +) + +type ProcessReportStatus int8 + +const ( + ProcessLabelKey = "processLabels" + + NotInit ProcessReportStatus = iota + Reported + Closed +) + +var process *processStat + +type processStat struct { + basePath string + metadataFile string + status ProcessReportStatus + shutdownOnce sync.Once +} + +func initProcessStat(r *gRPCReporter) *processStat { + basePath := path.Join(os.TempDir(), "apache_skywalking", "process", strconv.Itoa(os.Getpid())) + metaFilePath := path.Join(basePath, "metadata.properties") + + return &processStat{ + basePath: basePath, + metadataFile: metaFilePath, + status: NotInit, + } +} + +// Report the current process metadata to local file +// using to work with eBPF agent +func reportProcess(r *gRPCReporter) { + if process == nil { + process = initProcessStat(r) + } + + if process.status == NotInit { + // create metadata file + if p, err := process.initMetadataFile(r); err != nil { + r.logger.Warnf("process status file init failure: %s, %v", p, err) + } else { + process.status = Reported + } + } else if process.status == Reported { + // keep the metadata file alive(update modify time) + updateTime := time.Now() + if err := os.Chtimes(process.metadataFile, updateTime, updateTime); err != nil { + r.logger.Warnf("keep the process metadata alive failure: %v", err) + } + } +} + +func (p *processStat) initMetadataFile(r *gRPCReporter) (string, error) { + // create base directory + basePath := process.basePath + if err := os.RemoveAll(basePath); err != nil && !errors.Is(err, os.ErrNotExist) { + return basePath, err + } + if err := os.MkdirAll(basePath, 0o700); err != nil { + return basePath, err + } + + // create and write metadata file + metadataFile := process.metadataFile + if metaFile, err := os.Create(metadataFile); err != nil { + return metadataFile, err + } else { + if content, err := p.buildMetadataContent(r); err != nil { + return metadataFile, err + } else if _, err = metaFile.WriteString(content); err != nil { + return metadataFile, err + } + } + return "", nil +} + +func (p *processStat) buildMetadataContent(g *gRPCReporter) (string, error) { + layer := g.layer + if layer == "" { + layer = "GENERAL" + } + + propertiesJson, err := p.buildPropertiesJson(g) + if err != nil { + return "", err + } + + metadata := map[string]string{ + "layer": layer, + "service_name": g.service, + "instance_name": g.serviceInstance, + "process_name": g.serviceInstance, // process name is same with instance name + "properties": propertiesJson, + "labels": strings.Join(g.processLabels, ","), + "language": "golang", + } + + result := "" + for k, v := range metadata { + result += fmt.Sprintf("%s=%s\n", k, v) + } + return result, nil +} + +func (p *processStat) buildPropertiesJson(g *gRPCReporter) (string, error) { + props := buildOSInfo() + if g.instanceProps != nil { + for k, v := range g.instanceProps { + props = append(props, &commonv3.KeyStringValuePair{ + Key: k, + Value: v, + }) + } + } + + properties := make(map[string]string) + for _, p := range props { + properties[p.Key] = p.Value + } + bytes, err := json.Marshal(properties) + if err != nil { + return "", err + } + return string(bytes), nil +} + +func cleanupProcessDirectory(r *gRPCReporter) { + if process == nil { + return + } + process.shutdownOnce.Do(func() { + if err := os.RemoveAll(process.basePath); err != nil && r != nil { + r.logger.Warnf("could delete process: %s, %v", process.basePath, err) + } + process.status = Closed + }) +}