diff --git a/chromadb/telemetry/opentelemetry/grpc.py b/chromadb/telemetry/opentelemetry/grpc.py index 0fdcd71c56c..563d3c87236 100644 --- a/chromadb/telemetry/opentelemetry/grpc.py +++ b/chromadb/telemetry/opentelemetry/grpc.py @@ -4,8 +4,6 @@ import grpc from opentelemetry.trace import StatusCode, SpanKind -from chromadb.telemetry.opentelemetry import tracer - class _ClientCallDetails( collections.namedtuple( @@ -36,6 +34,8 @@ class OtelInterceptor( grpc.StreamStreamClientInterceptor, ): def _intercept_call(self, continuation, client_call_details, request_or_iterator): + from chromadb.telemetry.opentelemetry import tracer + if tracer is None: return continuation(client_call_details, request_or_iterator) with tracer.start_as_current_span( diff --git a/go/cmd/logservice/main.go b/go/cmd/logservice/main.go index 70d77cd6ebc..3dd46d61667 100644 --- a/go/cmd/logservice/main.go +++ b/go/cmd/logservice/main.go @@ -8,6 +8,7 @@ import ( "github.com/chroma-core/chroma/go/pkg/proto/logservicepb" "github.com/chroma-core/chroma/go/pkg/utils" libs "github.com/chroma-core/chroma/go/shared/libs" + "github.com/chroma-core/chroma/go/shared/otel" "github.com/pingcap/log" "github.com/rs/zerolog" "go.uber.org/automaxprocs/maxprocs" @@ -18,6 +19,7 @@ import ( func main() { ctx := context.Background() + // Configure logger utils.LogLevel = zerolog.DebugLevel utils.ConfigureLogger() @@ -26,6 +28,13 @@ func main() { } log.Info("Starting log service") config := configuration.NewLogServiceConfiguration() + err := otel.InitTracing(ctx, &otel.TracingConfig{ + Service: "log-service", + Endpoint: config.OPTL_TRACING_ENDPOINT, + }) + if err != nil { + log.Fatal("failed to initialize tracing", zap.Error(err)) + } conn, err := libs.NewPgConnection(ctx, config) if err != nil { log.Fatal("failed to connect to postgres", zap.Error(err)) @@ -37,7 +46,7 @@ func main() { if err != nil { log.Fatal("failed to listen", zap.Error(err)) } - s := grpc.NewServer() + s := grpc.NewServer(grpc.UnaryInterceptor(otel.ServerGrpcInterceptor)) logservicepb.RegisterLogServiceServer(s, server) log.Info("log service started", zap.String("address", listener.Addr().String())) if err := s.Serve(listener); err != nil { diff --git a/go/pkg/grpcutils/service.go b/go/pkg/grpcutils/service.go index 885726e34c5..5d2e5ce11c5 100644 --- a/go/pkg/grpcutils/service.go +++ b/go/pkg/grpcutils/service.go @@ -1,6 +1,7 @@ package grpcutils import ( + "context" "crypto/tls" "crypto/x509" "github.com/chroma-core/chroma/go/shared/otel" @@ -72,6 +73,14 @@ func newDefaultGrpcProvider(name string, grpcConfig *GrpcConfig, registerFunc fu opts = append(opts, grpc.Creds(credentials.NewTLS(tlsConfig))) } opts = append(opts, grpc.UnaryInterceptor(otel.ServerGrpcInterceptor)) + OPTL_TRACING_ENDPOINT := os.Getenv("OPTL_TRACING_ENDPOINT") + if OPTL_TRACING_ENDPOINT == "" { + OPTL_TRACING_ENDPOINT = "jaeger:4317" + } + otel.InitTracing(context.Background(), &otel.TracingConfig{ + Service: "sysdb-service", + Endpoint: OPTL_TRACING_ENDPOINT, + }) c := &defaultGrpcServer{ server: grpc.NewServer(opts...), diff --git a/go/pkg/log/configuration/config.go b/go/pkg/log/configuration/config.go index da75d918ced..e929047fedd 100644 --- a/go/pkg/log/configuration/config.go +++ b/go/pkg/log/configuration/config.go @@ -3,8 +3,9 @@ package configuration import "os" type LogServiceConfiguration struct { - PORT string - DATABASE_URL string + PORT string + DATABASE_URL string + OPTL_TRACING_ENDPOINT string } func getEnvWithDefault(key, defaultValue string) string { @@ -17,7 +18,8 @@ func getEnvWithDefault(key, defaultValue string) string { func NewLogServiceConfiguration() *LogServiceConfiguration { return &LogServiceConfiguration{ - PORT: getEnvWithDefault("PORT", "50051"), - DATABASE_URL: getEnvWithDefault("CHROMA_DATABASE_URL", "postgresql://chroma:chroma@postgres.chroma.svc.cluster.local:5432/log"), + PORT: getEnvWithDefault("PORT", "50051"), + DATABASE_URL: getEnvWithDefault("CHROMA_DATABASE_URL", "postgresql://chroma:chroma@postgres.chroma.svc.cluster.local:5432/log"), + OPTL_TRACING_ENDPOINT: getEnvWithDefault("OPTL_TRACING_ENDPOINT", "jaeger:4317"), } } diff --git a/go/shared/otel/main.go b/go/shared/otel/main.go index 677ef344192..6933df63be4 100644 --- a/go/shared/otel/main.go +++ b/go/shared/otel/main.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "fmt" + "github.com/pingcap/log" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" otelCode "go.opentelemetry.io/otel/codes" @@ -13,6 +14,7 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -71,25 +73,26 @@ func ServerGrpcInterceptor(ctx context.Context, req interface{}, info *grpc.Unar var span trace.Span ctx, span = tracer.Start(ctx, "Request "+info.FullMethod) defer span.End() + span.SetAttributes(attribute.String("rpc.method", info.FullMethod)) // Calls the handler h, err := handler(ctx, req) if err != nil { // Handle and log the error. - handleError(span, err) + handleError(span, info, err) return nil, err } // Set the status to OK upon success. span.SetStatus(otelCode.Ok, "ok") span.SetAttributes(attribute.String("rpc.status_code", "ok")) - span.SetAttributes(attribute.String("rpc.method", info.FullMethod)) + log.Info("RPC call", zap.String("method", info.FullMethod), zap.String("status", "ok")) return h, nil } // handleError logs and annotates the span with details of the encountered error. -func handleError(span trace.Span, err error) { +func handleError(span trace.Span, info *grpc.UnaryServerInfo, err error) { st, _ := status.FromError(err) span.SetStatus(otelCode.Error, "error") span.SetAttributes( @@ -97,6 +100,8 @@ func handleError(span trace.Span, err error) { attribute.String("rpc.message", st.Message()), attribute.String("rpc.error", st.Err().Error()), ) + log.Error("RPC call", zap.String("method", info.FullMethod), zap.String("status", st.Code().String()), zap.String("error", st.Err().Error()), zap.String("message", st.Message())) + } // decodeMetadataValue safely extracts a value from metadata, allowing for missing keys. diff --git a/k8s/distributed-chroma/values.yaml b/k8s/distributed-chroma/values.yaml index c5f1f0d2023..9864c6bebad 100644 --- a/k8s/distributed-chroma/values.yaml +++ b/k8s/distributed-chroma/values.yaml @@ -27,8 +27,11 @@ frontendService: memberlistProviderImpl: 'value: "chromadb.segment.impl.distributed.segment_directory.MockMemberlistProvider"' logServiceHost: 'value: "logservice.chroma"' logServicePort: 'value: "50051"' - otherEnvConfig: '' - + otherEnvConfig: | + - name: CHROMA_OTEL_COLLECTION_ENDPOINT + value: "http://jaeger:4317" + - name: CHROMA_OTEL_GRANULARITY + value: all sysdb: image: repository: 'local'