Skip to content

Commit ecf8260

Browse files
committed
add telemetry to nats,http and lua executor
Telemetry needs the TELEMETRY_TRACES env variable set to anything in order to enable the traces. By default it will output to stdout but traces can be sent to an OpenTelemetry server (like Grafana's Tempo). Added those options to the NixOS module
1 parent 9c79a1e commit ecf8260

File tree

8 files changed

+510
-58
lines changed

8 files changed

+510
-58
lines changed

cmd/server/http.go

Lines changed: 83 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,21 @@ import (
1212
log "github.com/sirupsen/logrus"
1313
"golang.org/x/net/context"
1414

15+
"go.opentelemetry.io/otel"
16+
"go.opentelemetry.io/otel/attribute"
17+
"go.opentelemetry.io/otel/codes"
18+
"go.opentelemetry.io/otel/propagation"
19+
"go.opentelemetry.io/otel/trace"
20+
1521
"github.com/numkem/msgscript"
1622
"github.com/numkem/msgscript/executor"
1723
)
1824

1925
const DEFAULT_HTTP_PORT = 7643
2026
const DEFAULT_HTTP_TIMEOUT = 5 * time.Second
2127

28+
var tracer = otel.Tracer("http-nats-proxy")
29+
2230
type httpNatsProxy struct {
2331
port string
2432
nc *nats.Conn
@@ -40,34 +48,56 @@ func NewHttpNatsProxy(port int, natsURL string) (*httpNatsProxy, error) {
4048
}
4149

4250
func (p *httpNatsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
51+
// Extract context from incoming request headers
52+
ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header))
53+
54+
// Start a new span for the HTTP request
55+
ctx, span := tracer.Start(ctx, "http.request",
56+
trace.WithSpanKind(trace.SpanKindServer),
57+
trace.WithAttributes(
58+
attribute.String("http.method", r.Method),
59+
attribute.String("http.url", r.URL.String()),
60+
attribute.String("http.remote_addr", r.RemoteAddr),
61+
),
62+
)
63+
defer span.End()
64+
4365
defer r.Body.Close()
4466

4567
// URL should look like /funcs.foobar
4668
// Where funcs.foobar is the subject for NATS
4769
ss := strings.Split(r.URL.Path, "/")
4870
// Validate URL structure
4971
if len(ss) < 2 {
72+
span.SetStatus(codes.Error, "Invalid URL structure")
73+
span.SetAttributes(attribute.Int("http.status_code", http.StatusBadRequest))
5074
w.WriteHeader(http.StatusBadRequest)
5175
w.Write([]byte("URL should be in the pattern of /<subject>"))
5276
return
5377
}
5478
subject := ss[1]
79+
span.SetAttributes(attribute.String("nats.subject", subject))
5580

5681
fields := log.Fields{
5782
"subject": subject,
5883
"client": r.RemoteAddr,
5984
}
6085
log.WithFields(fields).Info("Received HTTP request")
6186

87+
// Read request body with tracing
6288
payload, err := io.ReadAll(r.Body)
6389
if err != nil {
90+
span.RecordError(err)
91+
span.SetStatus(codes.Error, "Failed to read request body")
92+
span.SetAttributes(attribute.Int("http.status_code", http.StatusInternalServerError))
6493
w.WriteHeader(http.StatusInternalServerError)
6594
_, err = fmt.Fprintf(w, "failed to read request body: %s", err)
6695
if err != nil {
6796
log.WithFields(fields).Errorf("failed to write payload: %v", err)
6897
}
6998
return
7099
}
100+
span.SetAttributes(attribute.Int("http.request.body_size", len(payload)))
71101

72102
// We can override the HTTP timeout by passing the `_timeout` query string
73103
timeout := DEFAULT_HTTP_TIMEOUT
@@ -77,8 +107,9 @@ func (p *httpNatsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
77107
timeout = DEFAULT_HTTP_TIMEOUT
78108
}
79109
}
110+
span.SetAttributes(attribute.String("http.timeout", timeout.String()))
80111

81-
ctx, cancel := context.WithTimeout(r.Context(), timeout)
112+
ctx, cancel := context.WithTimeout(ctx, timeout)
82113
defer cancel()
83114

84115
url := strings.ReplaceAll(r.URL.String(), "/"+subject, "")
@@ -90,29 +121,61 @@ func (p *httpNatsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
90121
URL: url,
91122
})
92123
if err != nil {
124+
span.RecordError(err)
125+
span.SetStatus(codes.Error, "Failed to encode message")
93126
log.WithFields(fields).Errorf("failed to encode message: %v", err)
94127
return
95128
}
96129

97-
msg, err := p.nc.RequestWithContext(ctx, subject, body)
130+
// Start a child span for the NATS request
131+
ctx, natsSpan := tracer.Start(ctx, "nats.request",
132+
trace.WithSpanKind(trace.SpanKindClient),
133+
trace.WithAttributes(
134+
attribute.String("nats.subject", subject),
135+
attribute.Int("nats.message_size", len(body)),
136+
),
137+
)
138+
139+
// Inject trace context into NATS message headers
140+
msg := nats.NewMsg(subject)
141+
msg.Data = body
142+
otel.GetTextMapPropagator().Inject(ctx, natsHeaderCarrier(msg.Header))
143+
144+
response, err := p.nc.RequestMsgWithContext(ctx, msg)
98145
if err != nil {
146+
natsSpan.RecordError(err)
147+
natsSpan.SetStatus(codes.Error, "NATS request failed")
148+
natsSpan.End()
149+
span.SetStatus(codes.Error, "Service unavailable")
150+
span.SetAttributes(attribute.Int("http.status_code", http.StatusServiceUnavailable))
99151
w.WriteHeader(http.StatusServiceUnavailable)
100152
w.Write([]byte(err.Error()))
101153
return
102154
}
155+
natsSpan.SetAttributes(attribute.Int("nats.response_size", len(msg.Data)))
156+
natsSpan.SetStatus(codes.Ok, "")
157+
natsSpan.End()
103158

104159
rep := new(executor.Reply)
105-
err = json.Unmarshal(msg.Data, rep)
160+
err = json.Unmarshal(response.Data, rep)
106161
if err != nil {
162+
span.RecordError(err)
163+
span.SetStatus(codes.Error, "Failed to unmarshal response")
164+
span.SetAttributes(attribute.Int("http.status_code", http.StatusFailedDependency))
107165
w.WriteHeader(http.StatusFailedDependency)
108166
fmt.Fprintf(w, "Error: %v", err)
109167
return
110168
}
111169

112170
if rep.Error != "" {
171+
span.SetAttributes(attribute.String("executor.error", rep.Error))
113172
if rep.Error == (&executor.NoScriptFoundError{}).Error() {
173+
span.SetStatus(codes.Error, "Script not found")
174+
span.SetAttributes(attribute.Int("http.status_code", http.StatusNotFound))
114175
w.WriteHeader(http.StatusNotFound)
115176
} else {
177+
span.SetStatus(codes.Error, rep.Error)
178+
span.SetAttributes(attribute.Int("http.status_code", http.StatusInternalServerError))
116179
w.WriteHeader(http.StatusInternalServerError)
117180
}
118181

@@ -126,6 +189,7 @@ func (p *httpNatsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
126189

127190
// Go through all the scripts to see if one is HTML
128191
if t, sr := hasHTMLResult(rep.AllResults); t {
192+
span.SetAttributes(attribute.Bool("response.is_html", true))
129193
var hasContentType bool
130194
for k, v := range sr.Headers {
131195
if k == "Content-Type" {
@@ -136,27 +200,43 @@ func (p *httpNatsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
136200
if !hasContentType {
137201
w.Header().Add("Content-Type", "text/html")
138202
}
203+
span.SetAttributes(
204+
attribute.Int("http.status_code", sr.Code),
205+
attribute.Int("http.response.body_size", len(sr.Payload)),
206+
)
139207
w.WriteHeader(sr.Code)
140208

141209
_, err = w.Write(sr.Payload)
142210
if err != nil {
211+
span.RecordError(err)
143212
log.WithFields(fields).Errorf("failed to write reply back to HTTP response: %v", err)
144213
}
145214

215+
span.SetStatus(codes.Ok, "")
146216
// Since only the HTML page reply can "win" we ignore the rest
147217
return
148218
}
149219

150220
// Convert the results to bytes
221+
span.SetAttributes(attribute.Bool("response.is_html", false))
151222
rr, err := json.Marshal(rep.AllResults)
152223
if err != nil {
224+
span.RecordError(err)
153225
log.WithFields(fields).Errorf("failed to serialize all results to JSON: %v", err)
154226
}
155227

228+
span.SetAttributes(
229+
attribute.Int("http.status_code", http.StatusOK),
230+
attribute.Int("http.response.body_size", len(rr)),
231+
)
232+
156233
_, err = w.Write(rr)
157234
if err != nil {
235+
span.RecordError(err)
158236
log.WithFields(fields).Errorf("failed to write reply back to HTTP response: %v", err)
159237
}
238+
239+
span.SetStatus(codes.Ok, "")
160240
}
161241

162242
func hasHTMLResult(results map[string]*executor.ScriptResult) (bool, *executor.ScriptResult) {

cmd/server/main.go

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,32 @@ package main
22

33
import (
44
"encoding/json"
5+
"errors"
56
"flag"
67
"os"
78
"os/signal"
89
"strings"
9-
"syscall"
1010
"time"
1111

1212
natsserver "github.com/nats-io/nats-server/v2/server"
1313
"github.com/nats-io/nats.go"
1414
log "github.com/sirupsen/logrus"
1515
"golang.org/x/net/context"
1616

17+
"go.opentelemetry.io/otel"
18+
"go.opentelemetry.io/otel/attribute"
19+
"go.opentelemetry.io/otel/codes"
20+
"go.opentelemetry.io/otel/trace"
21+
1722
"github.com/numkem/msgscript"
1823
"github.com/numkem/msgscript/executor"
1924
msgplugin "github.com/numkem/msgscript/plugins"
2025
msgstore "github.com/numkem/msgscript/store"
2126
)
2227

2328
var version = "dev"
29+
var mainTracer = otel.Tracer("msgscript.main")
30+
2431
func main() {
2532
// Parse command-line flags
2633
backendName := flag.String("backend", msgstore.BACKEND_FILE_NAME, "Storage backend to use (etcd, sqlite, flatfile)")
@@ -33,6 +40,9 @@ func main() {
3340
scriptDir := flag.String("script", ".", "Script directory")
3441
flag.Parse()
3542

43+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
44+
defer stop()
45+
3646
// Set up logging
3747
level, err := log.ParseLevel(*logLevel)
3848
if err != nil {
@@ -44,6 +54,20 @@ func main() {
4454
log.SetLevel(log.DebugLevel)
4555
}
4656

57+
if os.Getenv("TELEMETRY_TRACES") != "" {
58+
log.WithField("kind", "traces").Info("Starting telemetry")
59+
60+
// Init traces
61+
otelShutdown, err := setupOTelSDK(ctx)
62+
if err != nil {
63+
log.Errorf("failed to initialize opentelemetry traces: %v", err)
64+
os.Exit(1)
65+
}
66+
defer func() {
67+
err = errors.Join(err, otelShutdown(context.Background()))
68+
}()
69+
}
70+
4771
// Create the ScriptStore based on the selected backend
4872
scriptStore, err := msgstore.StoreByName(*backendName, *etcdURL, *scriptDir, *libraryDir)
4973
if err != nil {
@@ -104,15 +128,33 @@ func main() {
104128

105129
// Set up a message handler
106130
_, err = nc.Subscribe(">", func(msg *nats.Msg) {
131+
// Extract trace context from NATS message headers
132+
ctx := otel.GetTextMapPropagator().Extract(
133+
context.Background(),
134+
natsHeaderCarrier(msg.Header),
135+
)
136+
137+
// Start a span for the NATS message handling
138+
ctx, span := mainTracer.Start(ctx, "nats.handle_message",
139+
trace.WithSpanKind(trace.SpanKindServer),
140+
trace.WithAttributes(
141+
attribute.String("nats.subject", msg.Subject),
142+
attribute.Int("nats.message_size", len(msg.Data)),
143+
),
144+
)
145+
defer span.End()
146+
107147
log.Debugf("Received message on subject: %s", msg.Subject)
108148

109149
if strings.HasPrefix(msg.Subject, "_INBOX.") {
150+
span.SetAttributes(attribute.Bool("nats.is_inbox", true))
151+
span.SetStatus(codes.Ok, "Ignored inbox subject")
110152
log.Debugf("Ignoring reply subject %s", msg.Subject)
111153
return
112154
}
113155

114156
m := new(executor.Message)
115-
err = json.Unmarshal(msg.Data, m)
157+
err := json.Unmarshal(msg.Data, m)
116158
// if the payload isn't a JSON Message, take it as a whole
117159
if err != nil {
118160
m.Subject = msg.Subject
@@ -126,6 +168,12 @@ func main() {
126168
"async": m.Async,
127169
}
128170

171+
span.SetAttributes(
172+
attribute.Bool("message.raw", m.Raw),
173+
attribute.Bool("message.async", m.Async),
174+
attribute.String("message.executor", m.Executor),
175+
)
176+
129177
// The above unmarshalling only applies to the structure of the JSON.
130178
// Even if you feed it another JSON where none of the keys matches,
131179
// it will just end up being an empty struct
@@ -139,37 +187,41 @@ func main() {
139187
replier := &replier{nc: nc}
140188
var messageReply executor.ReplyFunc
141189
if m.Async {
190+
span.SetAttributes(attribute.String("reply.mode", "async"))
142191
messageReply = replier.AsyncReply(m, msg)
143192
err = nc.Publish(msg.Reply, []byte("{}"))
144193
if err != nil {
194+
span.RecordError(err)
195+
span.SetStatus(codes.Error, "Failed to publish async reply")
145196
log.WithFields(fields).Errorf("failed to reply to message: %v", err)
146197
return
147198
}
148199
} else {
200+
span.SetAttributes(attribute.String("reply.mode", "sync"))
149201
messageReply = replier.SyncReply(m, msg)
150202
}
151203

152204
exec, err := executor.ExecutorByName(m.Executor, executors)
153205
if err != nil {
206+
span.RecordError(err)
207+
span.SetStatus(codes.Error, "Failed to get executor")
154208
log.WithError(err).Error("failed to get executor for message")
155209
return
156210
}
157211

212+
// Pass the context with trace info to the executor
158213
exec.HandleMessage(ctx, m, messageReply)
214+
span.SetStatus(codes.Ok, "Message handled")
159215
})
160216
if err != nil {
161217
log.Fatalf("Failed to subscribe to NATS subjects: %v", err)
162218
}
163219

164-
// Start HTTP Server
165-
go runHTTP(*httpPort, *natsURL)
166-
167-
// Listen for system interrupts for graceful shutdown
168-
sigChan := make(chan os.Signal, 1)
169-
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
170-
<-sigChan
171-
cancel()
220+
defer func() {
221+
log.Info("Received shutdown signal, stopping server...")
222+
executor.StopAllExecutors(executors)
223+
}()
172224

173-
log.Info("Received shutdown signal, stopping server...")
174-
executor.StopAllExecutors(executors)
225+
// Start HTTP Server
226+
runHTTP(*httpPort, *natsURL)
175227
}

0 commit comments

Comments
 (0)