diff --git a/Cargo.lock b/Cargo.lock index c9609016a..4ee6e837e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1898,10 +1898,14 @@ dependencies = [ "nativelink-error", "nativelink-macro", "nativelink-metric", + "nativelink-metric-collector", "nativelink-proto", "nativelink-scheduler", "nativelink-store", "nativelink-util", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "parking_lot", "pretty_assertions", "prost", @@ -1912,6 +1916,7 @@ dependencies = [ "tonic", "tower 0.5.1", "tracing", + "tracing-subscriber", "uuid", ] @@ -2123,6 +2128,24 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry-otlp" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b925a602ffb916fb7421276b86756027b37ee708f9dce2dbdcc51739f07e727" +dependencies = [ + "async-trait", + "futures-core", + "http 1.1.0", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "thiserror", + "tokio", + "tonic", +] + [[package]] name = "opentelemetry-prometheus" version = "0.17.0" @@ -2136,6 +2159,18 @@ dependencies = [ "protobuf", ] +[[package]] +name = "opentelemetry-proto" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ee9f20bff9c984511a02f082dc8ede839e4a9bf15cc2487c8d6fea5ad850d9" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic", +] + [[package]] name = "opentelemetry_sdk" version = "0.24.1" @@ -2149,7 +2184,12 @@ dependencies = [ "glob", "once_cell", "opentelemetry", + "percent-encoding", + "rand", + "serde_json", "thiserror", + "tokio", + "tokio-stream", ] [[package]] diff --git a/kubernetes/base/cas.yaml b/kubernetes/base/cas.yaml index d93f8481e..3483a746b 100644 --- a/kubernetes/base/cas.yaml +++ b/kubernetes/base/cas.yaml @@ -3,6 +3,9 @@ apiVersion: apps/v1 kind: Deployment metadata: name: nativelink-cas + labels: + app: nativelink-cas + app.kubernetes.io/part-of: nativelink spec: replicas: 1 selector: @@ -12,6 +15,7 @@ spec: metadata: labels: app: nativelink-cas + app.kubernetes.io/part-of: nativelink spec: containers: - name: nativelink-cas diff --git a/kubernetes/base/scheduler.yaml b/kubernetes/base/scheduler.yaml index 603d3d63e..7c188da1f 100644 --- a/kubernetes/base/scheduler.yaml +++ b/kubernetes/base/scheduler.yaml @@ -3,6 +3,9 @@ apiVersion: apps/v1 kind: Deployment metadata: name: nativelink-scheduler + labels: + app: nativelink-scheduler + app.kubernetes.io/part-of: nativelink spec: replicas: 1 selector: @@ -12,6 +15,7 @@ spec: metadata: labels: app: nativelink-scheduler + app.kubernetes.io/part-of: nativelink spec: containers: - name: nativelink-scheduler diff --git a/kubernetes/base/worker.yaml b/kubernetes/base/worker.yaml index dcf57bc2c..bb731b712 100644 --- a/kubernetes/base/worker.yaml +++ b/kubernetes/base/worker.yaml @@ -3,6 +3,9 @@ apiVersion: apps/v1 kind: Deployment metadata: name: nativelink-worker + labels: + app: nativelink-scheduler + app.kubernetes.io/part-of: nativelink spec: replicas: 3 selector: @@ -12,6 +15,7 @@ spec: metadata: labels: app: nativelink-worker + app.kubernetes.io/part-of: nativelink spec: initContainers: - name: nativelink-worker-init diff --git a/kubernetes/components/cert-manager/cert-manager.yaml b/kubernetes/components/cert-manager/cert-manager.yaml new file mode 100644 index 000000000..6e9a93cc4 --- /dev/null +++ b/kubernetes/components/cert-manager/cert-manager.yaml @@ -0,0 +1,35 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + name: cert-manager + labels: + toolkit.fluxcd.io/tenant: sre-team +--- +apiVersion: source.toolkit.fluxcd.io/v1 +kind: HelmRepository +metadata: + name: cert-manager + namespace: cert-manager +spec: + interval: 24h + url: https://charts.jetstack.io +--- +apiVersion: helm.toolkit.fluxcd.io/v2 +kind: HelmRelease +metadata: + name: cert-manager + namespace: cert-manager +spec: + interval: 30m + chart: + spec: + chart: cert-manager + version: "1.x" + sourceRef: + kind: HelmRepository + name: cert-manager + namespace: cert-manager + interval: 12h + values: + installCRDs: true diff --git a/kubernetes/components/cert-manager/kustomization.yaml b/kubernetes/components/cert-manager/kustomization.yaml new file mode 100644 index 000000000..03960c8e3 --- /dev/null +++ b/kubernetes/components/cert-manager/kustomization.yaml @@ -0,0 +1,5 @@ +--- +apiVersion: kustomize.config.k8s.io/v1alpha1 +kind: Component +resources: + - cert-manager.yaml diff --git a/kubernetes/components/nats-kafka/kustomization.yaml b/kubernetes/components/nats-kafka/kustomization.yaml new file mode 100644 index 000000000..a33ece71e --- /dev/null +++ b/kubernetes/components/nats-kafka/kustomization.yaml @@ -0,0 +1,5 @@ +--- +apiVersion: kustomize.config.k8s.io/v1alpha1 +kind: Component +resources: + - nats-kafka.yaml diff --git a/kubernetes/components/nats-kafka/nats-kafka.yaml b/kubernetes/components/nats-kafka/nats-kafka.yaml new file mode 100644 index 000000000..aaa68ff64 --- /dev/null +++ b/kubernetes/components/nats-kafka/nats-kafka.yaml @@ -0,0 +1,232 @@ +--- +apiVersion: v1 +kind: Service +metadata: + name: nats-kafka-bridge + namespace: nats-system +spec: + ports: + - name: kafka + port: 9092 + targetPort: 9092 + selector: + app: nats-kafka-bridge +--- +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: allow-otel-to-kafka-bridge + namespace: nats-system +spec: + podSelector: + matchLabels: + app: nats-kafka-bridge + ingress: + - from: + - namespaceSelector: + matchLabels: + kubernetes.io/metadata.name: default + podSelector: + matchLabels: + app.kubernetes.io/part-of: nativelink + - from: + - namespaceSelector: + matchLabels: + kubernetes.io/metadata.name: nats-system + podSelector: + matchLabels: + app.kubernetes.io/name: nats + ports: + - protocol: TCP + port: 9092 + egress: + - to: + - podSelector: + matchLabels: + app.kubernetes.io/name: nats + ports: + - protocol: TCP + port: 4222 + - to: + - podSelector: + matchLabels: + app: kafka + ports: + - protocol: TCP + port: 9092 + - to: + - namespaceSelector: {} + podSelector: + matchLabels: + k8s-app: kube-dns + ports: + - protocol: UDP + port: 53 + - protocol: TCP + port: 53 + policyTypes: [Ingress, Egress] +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nats-kafka-bridge + namespace: nats-system + labels: + app: nats-kafka-bridge + app.kubernetes.io/part-of: nats +spec: + replicas: 1 + selector: + matchLabels: + app: nats-kafka-bridge + template: + metadata: + labels: + app: nats-kafka-bridge + app.kubernetes.io/part-of: nats + spec: + containers: + - name: bridge + image: natsio/nats-kafka:latest + command: ["/bin/sh", "-c"] + args: + - | + echo "Config file contents:" + cat /config/bridge.conf + echo "Starting bridge with debug output..." + /nats-kafka -c /config/bridge.conf -DV || echo "Exit code: $?" + ports: + - containerPort: 9092 + name: kafka + volumeMounts: + - name: config + mountPath: /config + volumes: + - name: config + configMap: + name: nats-kafka-bridge-config +--- +# TODO(aaronmondal): This configmap doesn't properly update via flux. +apiVersion: v1 +kind: ConfigMap +metadata: + name: nats-kafka-bridge-config + namespace: nats-system +data: + bridge.conf: | + nats: { + Servers: ["nats://nats.nats-system.svc.cluster.local:4222"] + trace: true, + verbose: true, + debug: true + } + + jetstream: { + maxwait: 5000, + publishasyncmaxpending: 256, + heartbeatinterval: 5, + reconnectwait: 2, + enableflowcontrol: true, + enableacksync: true + } + + connect: [ + { + type: "KafkaToJetStream" + id: "otel-bridge" + brokers: ["kafka.nats-system.svc.cluster.local:9092"] + topic: "otel-telemetry" + subject: "otel.telemetry.kafka" + stream: "TELEMETRY" + } + ] +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: nats-stream-init + namespace: nats-system +spec: + template: + spec: + containers: + - name: nats-cli + image: natsio/nats-box:latest + command: + - /bin/sh + - -c + - | + nats stream add TELEMETRY \ + --server=nats://nats.nats-system.svc.cluster.local:4222 \ + --subjects="otel.telemetry.>" \ + --storage=file \ + --retention=limits \ + --discard=old \ + --max-msgs=-1 \ + --max-bytes=-1 \ + --max-age=24h \ + --dupe-window=2m \ + --replicas=1 \ + --max-msgs-per-subject=-1 \ + --max-msg-size=-1 \ + --allow-direct \ + --no-deny-delete \ + --no-deny-purge \ + --no-allow-rollup + restartPolicy: OnFailure +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kafka + namespace: nats-system +spec: + replicas: 1 + selector: + matchLabels: + app: kafka + template: + metadata: + labels: + app: kafka + spec: + containers: + - name: kafka + image: bitnami/kafka:latest + ports: + - containerPort: 9092 + env: + - name: KAFKA_CFG_NODE_ID + value: "1" + - name: KAFKA_CFG_PROCESS_ROLES + value: "controller,broker" + - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS + value: "1@kafka.nats-system.svc.cluster.local:9093" + - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES + value: "CONTROLLER" + - name: KAFKA_CFG_LISTENERS + value: "CONTROLLER://:9093,PLAINTEXT://:9092" + - name: KAFKA_CFG_ADVERTISED_LISTENERS + value: "PLAINTEXT://kafka.nats-system.svc.cluster.local:9092" + - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP + value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" + - name: KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE + value: "true" + - name: ALLOW_PLAINTEXT_LISTENER + value: "yes" +--- +apiVersion: v1 +kind: Service +metadata: + name: kafka + namespace: nats-system +spec: + selector: + app: kafka + ports: + - name: kafka + port: 9092 + targetPort: 9092 + - name: controller + port: 9093 + targetPort: 9093 diff --git a/kubernetes/components/nats/kustomization.yaml b/kubernetes/components/nats/kustomization.yaml new file mode 100644 index 000000000..66db6a63b --- /dev/null +++ b/kubernetes/components/nats/kustomization.yaml @@ -0,0 +1,5 @@ +--- +apiVersion: kustomize.config.k8s.io/v1alpha1 +kind: Component +resources: + - nats.yaml diff --git a/kubernetes/components/nats/nats.yaml b/kubernetes/components/nats/nats.yaml new file mode 100644 index 000000000..6e9736b51 --- /dev/null +++ b/kubernetes/components/nats/nats.yaml @@ -0,0 +1,63 @@ +--- +apiVersion: v1 +kind: Namespace +metadata: + name: nats-system + labels: + toolkit.fluxcd.io/tenant: sre-team +--- +apiVersion: source.toolkit.fluxcd.io/v1 +kind: HelmRepository +metadata: + name: nats + namespace: nats-system +spec: + interval: 24h + url: https://nats-io.github.io/k8s/helm/charts/ +--- +apiVersion: helm.toolkit.fluxcd.io/v2 +kind: HelmRelease +metadata: + name: nats + namespace: nats-system +spec: + interval: 30m + chart: + spec: + chart: nats + version: "1.x" + sourceRef: + kind: HelmRepository + name: nats + namespace: nats-system + interval: 12h + values: + config: + jetstream: + enabled: true + fileStore: + pvc: + size: 10Gi + # Enable Prometheus metrics + merge: + server_name: "nats-jetstream" + debug: false + trace: false + jetstream: + max_memory_store: "<< 1GB >>" + max_file_store: "<< 10GB >>" + + promExporter: + enabled: true + + container: + env: + GOMEMLIMIT: 1GiB + merge: + resources: + requests: + cpu: "500m" + memory: 1Gi + limits: + cpu: "1" + memory: 1Gi diff --git a/kubernetes/components/opentelemetry-collector/collector.yaml b/kubernetes/components/opentelemetry-collector/collector.yaml new file mode 100644 index 000000000..be2459c73 --- /dev/null +++ b/kubernetes/components/opentelemetry-collector/collector.yaml @@ -0,0 +1,102 @@ +--- +apiVersion: networking.k8s.io/v1 +kind: NetworkPolicy +metadata: + name: allow-nativelink-to-nats + namespace: default +spec: + podSelector: + matchLabels: + app.kubernetes.io/part-of: nativelink + egress: + - to: # Allow access to Kafka + - namespaceSelector: + matchLabels: + kubernetes.io/metadata.name: nats-system + podSelector: + matchLabels: + app: kafka + ports: + - protocol: TCP + port: 9092 + - to: # Allow access to NATS + - namespaceSelector: + matchLabels: + kubernetes.io/metadata.name: nats-system + podSelector: + matchLabels: + app.kubernetes.io/name: nats + ports: + - protocol: TCP + port: 4222 + - to: # Allow DNS resolution + - namespaceSelector: {} + podSelector: + matchLabels: + k8s-app: kube-dns + ports: + - protocol: UDP + port: 53 + - protocol: TCP + port: 53 + policyTypes: [Egress] +--- +apiVersion: opentelemetry.io/v1beta1 +kind: OpenTelemetryCollector +metadata: + name: nativelink-opentelemetry-collector + labels: + app.kubernetes.io/part-of: nativelink +spec: + mode: sidecar + config: + receivers: + otlp: + protocols: + grpc: + endpoint: "0.0.0.0:4317" + processors: + batch: + send_batch_size: 10000 + timeout: 10s + memory_limiter: + check_interval: 1s + limit_percentage: 75 + spike_limit_percentage: 15 + exporters: + prometheus: + endpoint: "0.0.0.0:8889" + debug: + verbosity: detailed + kafka: + brokers: ["kafka.nats-system.svc.cluster.local:9092"] + protocol_version: "2.0.0" + encoding: "otlp_proto" + topic: otel-telemetry + metadata: + full: true + timeout: 5s + retry_on_failure: + enabled: true + initial_interval: 5s + max_interval: 30s + max_elapsed_time: 300s + service: + telemetry: + metrics: + address: "0.0.0.0:8888" + logs: + level: "debug" + pipelines: + traces: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [debug, kafka] + metrics: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [prometheus, debug, kafka] + logs: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [debug, kafka] diff --git a/kubernetes/components/opentelemetry-collector/kustomization.yaml b/kubernetes/components/opentelemetry-collector/kustomization.yaml new file mode 100644 index 000000000..1ea273a98 --- /dev/null +++ b/kubernetes/components/opentelemetry-collector/kustomization.yaml @@ -0,0 +1,5 @@ +--- +apiVersion: kustomize.config.k8s.io/v1alpha1 +kind: Component +resources: + - collector.yaml diff --git a/kubernetes/components/opentelemetry-operator/kustomization.yaml b/kubernetes/components/opentelemetry-operator/kustomization.yaml new file mode 100644 index 000000000..82caa5a3b --- /dev/null +++ b/kubernetes/components/opentelemetry-operator/kustomization.yaml @@ -0,0 +1,5 @@ +--- +apiVersion: kustomize.config.k8s.io/v1alpha1 +kind: Component +resources: + - https://github.com/open-telemetry/opentelemetry-operator/releases/latest/download/opentelemetry-operator.yaml diff --git a/kubernetes/components/operator/flux-config.yaml b/kubernetes/components/operator/flux-config.yaml index cd6305a9a..c60ebb25c 100644 --- a/kubernetes/components/operator/flux-config.yaml +++ b/kubernetes/components/operator/flux-config.yaml @@ -106,9 +106,24 @@ spec: substituteFrom: - kind: ConfigMap name: nativelink-image-tags + patches: + - patch: |- + apiVersion: apps/v1 + kind: Deployment + metadata: + name: not-used + spec: + template: + metadata: + annotations: + sidecar.opentelemetry.io/inject: "true" + target: + kind: Deployment + labelSelector: "app.kubernetes.io/part-of=nativelink" dependsOn: - name: nativelink-configmaps - name: nativelink-tekton-resources + - name: opentelemetry-collector --- apiVersion: kustomize.toolkit.fluxcd.io/v1 kind: Kustomization @@ -127,3 +142,99 @@ spec: kind: GitRepository name: nativelink namespace: default +--- +apiVersion: kustomize.toolkit.fluxcd.io/v1 +kind: Kustomization +metadata: + name: cert-manager + namespace: flux-system +spec: + interval: 2m + path: "./kubernetes/components/cert-manager" + prune: true + force: true + retryInterval: 20s + targetNamespace: cert-manager + wait: true + sourceRef: + kind: GitRepository + name: nativelink + namespace: default +--- +apiVersion: kustomize.toolkit.fluxcd.io/v1 +kind: Kustomization +metadata: + name: nats + namespace: flux-system +spec: + interval: 2m + path: "./kubernetes/components/nats" + prune: true + force: true + retryInterval: 20s + targetNamespace: nats-system + wait: true + sourceRef: + kind: GitRepository + name: nativelink + namespace: default +--- +apiVersion: kustomize.toolkit.fluxcd.io/v1 +kind: Kustomization +metadata: + name: nats-kafka + namespace: nats-system +spec: + interval: 10m + prune: true + sourceRef: + kind: GitRepository + name: nativelink + namespace: default + path: ./kubernetes/components/nats-kafka + dependsOn: + - name: nats + namespace: flux-system +--- +apiVersion: kustomize.toolkit.fluxcd.io/v1 +kind: Kustomization +metadata: + name: opentelemetry-operator + namespace: flux-system +spec: + interval: 2m + path: "./kubernetes/components/opentelemetry-operator" + prune: true + force: true + retryInterval: 20s + targetNamespace: opentelemetry-operator-system + wait: true + sourceRef: + kind: GitRepository + name: nativelink + namespace: default + dependsOn: + - name: cert-manager +--- +apiVersion: kustomize.toolkit.fluxcd.io/v1 +kind: Kustomization +metadata: + name: opentelemetry-collector + namespace: default +spec: + interval: 2m + path: "./kubernetes/components/opentelemetry-collector" + prune: true + force: true + retryInterval: 20s + targetNamespace: default + wait: true + sourceRef: + kind: GitRepository + name: nativelink + namespace: default + dependsOn: + - name: opentelemetry-operator + namespace: flux-system + - name: nats + namespace: flux-system diff --git a/kubernetes/configmaps/cas.json b/kubernetes/configmaps/cas.json index e26b9c1cb..52e19eac9 100644 --- a/kubernetes/configmaps/cas.json +++ b/kubernetes/configmaps/cas.json @@ -79,7 +79,9 @@ "services": { "experimental_prometheus": { "path": "/metrics" - } + }, + // TODO(aaronmondal): This is actually served on 4317. + "experimental_otlp": {} } }, { diff --git a/kubernetes/configmaps/scheduler.json b/kubernetes/configmaps/scheduler.json index e41060209..c1f482ee8 100644 --- a/kubernetes/configmaps/scheduler.json +++ b/kubernetes/configmaps/scheduler.json @@ -89,6 +89,8 @@ "scheduler": "MAIN_SCHEDULER", }, "health": {}, + // TODO(aaronmondal): This is actually served on 4317. + "experimental_otlp": {} } }] } diff --git a/kubernetes/configmaps/worker.json b/kubernetes/configmaps/worker.json index 2a3d2911d..97600fd7b 100644 --- a/kubernetes/configmaps/worker.json +++ b/kubernetes/configmaps/worker.json @@ -78,7 +78,17 @@ } } }], - "servers": [], + "servers": [{ + "listener": { + "http": { + "socket_address": "0.0.0.0:50061", + } + }, + "services": { + // TODO(aaronmondal): This is actually served on 4317. + "experimental_otlp": {}, + } + }], "global": { "max_open_files": 524288 } diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs index 3061c4f44..facfc66fa 100644 --- a/nativelink-config/src/cas_server.rs +++ b/nativelink-config/src/cas_server.rs @@ -179,6 +179,17 @@ pub struct PrometheusConfig { pub path: String, } +#[derive(Deserialize, Debug, Default)] +#[serde(deny_unknown_fields)] +pub struct OtlpConfig { + /// Grpc endpoint. This is a fully qualified URL as in "http + /// TODO(aaronmondal): Should default to https. + /// + /// Default: http://localhost:4317 + #[serde(default)] + pub endpoint: String, +} + #[derive(Deserialize, Debug, Default)] #[serde(deny_unknown_fields)] pub struct AdminConfig { @@ -257,6 +268,9 @@ pub struct ServicesConfig { /// as a singleton but may be served on multiple endpoints. pub experimental_prometheus: Option, + /// Experimental - Enable the OpenTelemetry Protocol endpoint. + pub experimental_otlp: Option, + /// This is the service for any administrative tasks. /// It provides a REST API endpoint for administrative purposes. pub admin: Option, diff --git a/nativelink-service/BUILD.bazel b/nativelink-service/BUILD.bazel index f18e55bc4..ee74a9437 100644 --- a/nativelink-service/BUILD.bazel +++ b/nativelink-service/BUILD.bazel @@ -17,12 +17,15 @@ rust_library( "src/execution_server.rs", "src/health_server.rs", "src/lib.rs", + "src/otlp_server.rs", "src/worker_api_server.rs", ], visibility = ["//visibility:public"], deps = [ "//nativelink-config", "//nativelink-error", + "//nativelink-metric", + "//nativelink-metric-collector", "//nativelink-proto", "//nativelink-scheduler", "//nativelink-store", @@ -33,6 +36,9 @@ rust_library( "@crates//:http-body", "@crates//:http-body-util", "@crates//:hyper-1.4.1", + "@crates//:opentelemetry", + "@crates//:opentelemetry-otlp", + "@crates//:opentelemetry_sdk", "@crates//:parking_lot", "@crates//:prost", "@crates//:serde_json5", @@ -40,6 +46,7 @@ rust_library( "@crates//:tonic", "@crates//:tower", "@crates//:tracing", + "@crates//:tracing-subscriber", "@crates//:uuid", ], ) diff --git a/nativelink-service/Cargo.toml b/nativelink-service/Cargo.toml index 92b9e6b51..3a11338f0 100644 --- a/nativelink-service/Cargo.toml +++ b/nativelink-service/Cargo.toml @@ -10,6 +10,8 @@ nativelink-config = { path = "../nativelink-config" } nativelink-util = { path = "../nativelink-util" } nativelink-store = { path = "../nativelink-store" } nativelink-scheduler = { path = "../nativelink-scheduler" } +nativelink-metric = { path = "../nativelink-metric" } +nativelink-metric-collector = { path = "../nativelink-metric-collector" } axum = { version = "0.7.7", default-features = false } bytes = { version = "1.7.2", default-features = false } futures = { version = "0.3.30", default-features = false } @@ -24,8 +26,13 @@ tokio-stream = { version = "0.1.16", features = ["fs"], default-features = false tonic = { version = "0.12.3", features = ["transport", "tls"], default-features = false } tower = { version = "0.5.1", default-features = false } tracing = { version = "0.1.40", default-features = false } +tracing-subscriber = { version = "0.3.18", default-features = false } uuid = { version = "1.10.0", default-features = false, features = ["v4", "serde"] } +opentelemetry-otlp = "0.17.0" +opentelemetry_sdk = { version = "0.24.1", features = ["metrics", "rt-tokio"], default-features = false } +opentelemetry = { version = "0.24.0", default-features = false } + [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } nativelink-metric = { path = "../nativelink-metric" } diff --git a/nativelink-service/src/lib.rs b/nativelink-service/src/lib.rs index 534b55072..50d1f8879 100644 --- a/nativelink-service/src/lib.rs +++ b/nativelink-service/src/lib.rs @@ -19,4 +19,5 @@ pub mod capabilities_server; pub mod cas_server; pub mod execution_server; pub mod health_server; +pub mod otlp_server; pub mod worker_api_server; diff --git a/nativelink-service/src/otlp_server.rs b/nativelink-service/src/otlp_server.rs new file mode 100644 index 000000000..456b00d04 --- /dev/null +++ b/nativelink-service/src/otlp_server.rs @@ -0,0 +1,91 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use nativelink_error::{make_err, Code, Error}; +use nativelink_metric::{MetricFieldData, MetricKind, MetricsComponent, RootMetricsComponent}; +use nativelink_metric_collector::{otel_export, MetricsCollectorLayer}; +use nativelink_util::spawn_blocking; +use opentelemetry::metrics::MeterProvider; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry_sdk::runtime; +use parking_lot::RwLock; +use tracing::error; +use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt; + +pub struct OtlpServer { + root_metrics: Arc>, + provider: SdkMeterProvider, +} + +impl OtlpServer { + pub fn new( + root_metrics: Arc>, + endpoint: &str, + ) -> Result { + let pipeline = opentelemetry_otlp::new_pipeline() + .metrics(runtime::Tokio) + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(endpoint) + .with_timeout(Duration::from_secs(10)), + ); + + let provider = pipeline + .build() + .map_err(|e| make_err!(Code::Internal, "{e}"))?; + + Ok(Self { + root_metrics, + provider, + }) + } + + pub async fn collect_and_export(&self) -> Result<(), Error> { + let meter = self.provider.meter("nativelink"); + let metrics = self.root_metrics.clone(); + + spawn_blocking!("otlp_metrics", move || { + let (layer, output_metrics) = MetricsCollectorLayer::new(); + tracing::subscriber::with_default(tracing_subscriber::registry().with(layer), || { + let metrics_component = metrics.read(); + MetricsComponent::publish( + &*metrics_component, + MetricKind::Component, + MetricFieldData::default(), + ) + }) + .map_err(|e| make_err!(Code::Internal, "{e}"))?; + + otel_export("nativelink".to_string(), &meter, &output_metrics.lock()); + + Ok::<(), Error>(()) + }) + .await + .map_err(|e| make_err!(Code::Internal, "Join error: {}", e))? + } + + pub async fn run_export_loop(self) { + loop { + tokio::time::sleep(Duration::from_secs(15)).await; + if let Err(e) = self.collect_and_export().await { + error!("Failed to export OTLP metrics: {e}"); + } + } + } +} diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 7e2276a63..0c15a2ad7 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -43,6 +43,7 @@ use nativelink_service::capabilities_server::CapabilitiesServer; use nativelink_service::cas_server::CasServer; use nativelink_service::execution_server::ExecutionServer; use nativelink_service::health_server::HealthServer; +use nativelink_service::otlp_server::OtlpServer; use nativelink_service::worker_api_server::WorkerApiServer; use nativelink_store::default_store_factory::store_factory; use nativelink_store::store_manager::StoreManager; @@ -85,6 +86,9 @@ static GLOBAL: MiMalloc = MiMalloc; /// Note: This must be kept in sync with the documentation in `PrometheusConfig::path`. const DEFAULT_PROMETHEUS_METRICS_PATH: &str = "/metrics"; +/// Note: This must be kept in sync with the documentation in `OtlpConfig::endpoint`. +const DEFAULT_OTLP_ENDPOINT: &str = "http://localhost:4317"; + /// Note: This must be kept in sync with the documentation in `AdminConfig::path`. const DEFAULT_ADMIN_API_PATH: &str = "/admin"; @@ -594,6 +598,22 @@ async fn inner_main( ); } + if let Some(otlp_cfg) = services.experimental_otlp { + let endpoint = if otlp_cfg.endpoint.is_empty() { + DEFAULT_OTLP_ENDPOINT + } else { + &otlp_cfg.endpoint + }; + + let server = OtlpServer::new(root_metrics.clone(), endpoint)?; + + background_spawn!( + name: "otlp_metrics_loop", + fut: server.run_export_loop(), + target: "nativelink::services", + ); + } + if let Some(admin_config) = services.admin { let path = if admin_config.path.is_empty() { DEFAULT_ADMIN_API_PATH