Skip to content

Commit 5643b6c

Browse files
committed
Big refactor, fixes both wasm and podman
Now the scripts are fetched when the message is received and each scripts gets called with their respective executor. This simplifies greatly the executors as they no longer have to fetch the scripts themselves. They retain the store if they need to fetch more things (like libraries for the lua executor).
1 parent 1ec5f68 commit 5643b6c

File tree

10 files changed

+741
-546
lines changed

10 files changed

+741
-546
lines changed

.dir-locals.el

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
((go-mode . ((eglot-workspace-configuration .
2+
(:gopls (:buildFlags ["-tags=podman,wasmtime"]))))))

cmd/server/http.go

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func NewHttpNatsProxy(port int, natsURL string) (*httpNatsProxy, error) {
5050
func (p *httpNatsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
5151
// Extract context from incoming request headers
5252
ctx := otel.GetTextMapPropagator().Extract(r.Context(), propagation.HeaderCarrier(r.Header))
53-
53+
5454
// Start a new span for the HTTP request
5555
ctx, span := tracer.Start(ctx, "http.request",
5656
trace.WithSpanKind(trace.SpanKindServer),
@@ -112,8 +112,9 @@ func (p *httpNatsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
112112
ctx, cancel := context.WithTimeout(ctx, timeout)
113113
defer cancel()
114114

115+
// Change the url passed to the fuction to remove the subject
115116
url := strings.ReplaceAll(r.URL.String(), "/"+subject, "")
116-
log.Debug(url)
117+
log.Debugf("URL: %s", url)
117118
body, err := json.Marshal(&executor.Message{
118119
Payload: payload,
119120
Method: r.Method,
@@ -141,13 +142,16 @@ func (p *httpNatsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
141142
msg.Data = body
142143
otel.GetTextMapPropagator().Inject(ctx, natsHeaderCarrier(msg.Header))
143144

145+
// Send the message and wait for the response
144146
response, err := p.nc.RequestMsgWithContext(ctx, msg)
145147
if err != nil {
146148
natsSpan.RecordError(err)
147149
natsSpan.SetStatus(codes.Error, "NATS request failed")
148150
natsSpan.End()
151+
149152
span.SetStatus(codes.Error, "Service unavailable")
150153
span.SetAttributes(attribute.Int("http.status_code", http.StatusServiceUnavailable))
154+
151155
w.WriteHeader(http.StatusServiceUnavailable)
152156
w.Write([]byte(err.Error()))
153157
return
@@ -156,7 +160,7 @@ func (p *httpNatsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
156160
natsSpan.SetStatus(codes.Ok, "")
157161
natsSpan.End()
158162

159-
rep := new(executor.Reply)
163+
rep := new(Reply)
160164
err = json.Unmarshal(response.Data, rep)
161165
if err != nil {
162166
span.RecordError(err)
@@ -188,38 +192,41 @@ func (p *httpNatsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
188192
}
189193

190194
// Go through all the scripts to see if one is HTML
191-
if t, sr := hasHTMLResult(rep.AllResults); t {
192-
span.SetAttributes(attribute.Bool("response.is_html", true))
193-
var hasContentType bool
194-
for k, v := range sr.Headers {
195-
if k == "Content-Type" {
196-
hasContentType = true
195+
for _, scrRes := range rep.Results {
196+
if scrRes.IsHTML {
197+
198+
span.SetAttributes(attribute.Bool("response.is_html", true))
199+
var hasContentType bool
200+
for k, v := range scrRes.Headers {
201+
if k == "Content-Type" {
202+
hasContentType = true
203+
}
204+
w.Header().Add(k, v)
205+
}
206+
if !hasContentType {
207+
w.Header().Add("Content-Type", "text/html")
208+
}
209+
span.SetAttributes(
210+
attribute.Int("http.status_code", scrRes.Code),
211+
attribute.Int("http.response.body_size", len(scrRes.Payload)),
212+
)
213+
w.WriteHeader(scrRes.Code)
214+
215+
_, err = w.Write(scrRes.Payload)
216+
if err != nil {
217+
span.RecordError(err)
218+
log.WithFields(fields).Errorf("failed to write reply back to HTTP response: %v", err)
197219
}
198-
w.Header().Add(k, v)
199-
}
200-
if !hasContentType {
201-
w.Header().Add("Content-Type", "text/html")
202-
}
203-
span.SetAttributes(
204-
attribute.Int("http.status_code", sr.Code),
205-
attribute.Int("http.response.body_size", len(sr.Payload)),
206-
)
207-
w.WriteHeader(sr.Code)
208220

209-
_, err = w.Write(sr.Payload)
210-
if err != nil {
211-
span.RecordError(err)
212-
log.WithFields(fields).Errorf("failed to write reply back to HTTP response: %v", err)
221+
span.SetStatus(codes.Ok, "")
222+
// Since only the HTML page reply can "win" we ignore the rest
223+
return
213224
}
214-
215-
span.SetStatus(codes.Ok, "")
216-
// Since only the HTML page reply can "win" we ignore the rest
217-
return
218225
}
219226

220227
// Convert the results to bytes
221228
span.SetAttributes(attribute.Bool("response.is_html", false))
222-
rr, err := json.Marshal(rep.AllResults)
229+
rr, err := json.Marshal(rep.Results)
223230
if err != nil {
224231
span.RecordError(err)
225232
log.WithFields(fields).Errorf("failed to serialize all results to JSON: %v", err)

cmd/server/main.go

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import (
44
"encoding/json"
55
"errors"
66
"flag"
7+
"fmt"
78
"os"
89
"os/signal"
910
"strings"
11+
"sync"
1012
"time"
1113

1214
natsserver "github.com/nats-io/nats-server/v2/server"
@@ -22,6 +24,7 @@ import (
2224
"github.com/numkem/msgscript"
2325
"github.com/numkem/msgscript/executor"
2426
msgplugin "github.com/numkem/msgscript/plugins"
27+
"github.com/numkem/msgscript/script"
2528
msgstore "github.com/numkem/msgscript/store"
2629
)
2730

@@ -171,7 +174,6 @@ func main() {
171174
span.SetAttributes(
172175
attribute.Bool("message.raw", m.Raw),
173176
attribute.Bool("message.async", m.Async),
174-
attribute.String("message.executor", m.Executor),
175177
)
176178

177179
// The above unmarshalling only applies to the structure of the JSON.
@@ -184,33 +186,90 @@ func main() {
184186
}
185187
}
186188

187-
replier := &replier{nc: nc}
188-
var messageReply executor.ReplyFunc
189189
if m.Async {
190190
span.SetAttributes(attribute.String("reply.mode", "async"))
191-
messageReply = replier.AsyncReply(m, msg)
192191
err = nc.Publish(msg.Reply, []byte("{}"))
193192
if err != nil {
194193
span.RecordError(err)
195194
span.SetStatus(codes.Error, "Failed to publish async reply")
196195
log.WithFields(fields).Errorf("failed to reply to message: %v", err)
196+
197+
replyWithError(nc, fmt.Errorf("failed to reply to message: %v", err), msg.Reply)
197198
return
198199
}
199200
} else {
200201
span.SetAttributes(attribute.String("reply.mode", "sync"))
201-
messageReply = replier.SyncReply(m, msg)
202202
}
203203

204-
exec, err := executor.ExecutorByName(m.Executor, executors)
204+
cctx, getScriptsSpan := mainTracer.Start(ctx, "nats.handle_message.get_scripts", trace.WithAttributes(
205+
attribute.String("script.Name", m.Subject),
206+
attribute.String("script.URL", m.URL),
207+
))
208+
209+
scripts, err := scriptStore.GetScripts(cctx, m.Subject)
205210
if err != nil {
211+
log.WithError(err).WithField("subject", m.Subject).Error("failed to get scripts for subject")
206212
span.RecordError(err)
207-
span.SetStatus(codes.Error, "Failed to get executor")
208-
log.WithError(err).Error("failed to get executor for message")
213+
span.SetStatus(codes.Error, "Failed to get scripts")
214+
215+
replyWithError(nc, fmt.Errorf("failed to get scripts for subject"), msg.Reply)
216+
return
217+
}
218+
getScriptsSpan.SetStatus(codes.Ok, fmt.Sprintf("found %d scripts", len(scripts)))
219+
getScriptsSpan.End()
220+
221+
_, executeScriptsSpan := mainTracer.Start(ctx, "nats.handle_message.run_scripts")
222+
defer executeScriptsSpan.End()
223+
224+
var wg sync.WaitGroup
225+
allResults := make(chan *executor.ScriptResult, len(scripts))
226+
for _, scr := range scripts {
227+
wg.Add(1)
228+
go func(ctx context.Context, msg *executor.Message, script *script.Script) {
229+
defer wg.Done()
230+
231+
// Pass the context with trace info to the executor
232+
exec, err := executor.ExecutorByName(scr.Executor, executors)
233+
if err != nil {
234+
executeScriptsSpan.RecordError(err)
235+
executeScriptsSpan.SetStatus(codes.Error, "Failed to get executor")
236+
log.WithError(err).Error("failed to get executor for script")
237+
238+
allResults <- &executor.ScriptResult{Error: fmt.Sprintf("failed to get executor for script: %v", err)}
239+
return
240+
}
241+
242+
rep := exec.HandleMessage(ctx, m, scr)
243+
allResults <- rep
244+
}(ctx, m, scr)
245+
}
246+
wg.Wait()
247+
248+
close(allResults)
249+
250+
_, parseReplySpan := mainTracer.Start(ctx, "nats.handle_message.parse_replies")
251+
msgRep := new(Reply)
252+
for res := range allResults {
253+
if res.IsHTML {
254+
msgRep.HTML = true
255+
}
256+
257+
msgRep.Results = append(msgRep.Results, res)
258+
}
259+
parseReplySpan.SetStatus(codes.Ok, "responses parsed")
260+
parseReplySpan.End()
261+
262+
_, natsReplaySpan := mainTracer.Start(ctx, "nats.handle_message.send_reply")
263+
err = replyMessage(nc, m, msg.Reply, msgRep)
264+
if err != nil {
265+
natsReplaySpan.RecordError(err)
266+
natsReplaySpan.SetStatus(codes.Error, "Failed to send reply through NATS")
267+
268+
log.WithError(err).Errorf("failed to send reply through NATS")
209269
return
210270
}
211271

212-
// Pass the context with trace info to the executor
213-
exec.HandleMessage(ctx, m, messageReply)
272+
log.WithField("subject", msg.Subject).Debugf("finished running %d scripts", len(scripts))
214273
span.SetStatus(codes.Ok, "Message handled")
215274
})
216275
if err != nil {

cmd/server/reply.go

Lines changed: 38 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,58 @@
11
package main
22

33
import (
4+
"encoding/json"
5+
"fmt"
6+
47
"github.com/nats-io/nats.go"
58
log "github.com/sirupsen/logrus"
69

710
"github.com/numkem/msgscript/executor"
811
)
912

10-
type replier struct {
11-
nc *nats.Conn
13+
type Reply struct {
14+
Results []*executor.ScriptResult `json:"script_result"`
15+
HTML bool `json:"is_html"`
16+
Error string `json:"error,omitempty"`
1217
}
1318

14-
func (rep *replier) SyncReply(m *executor.Message, msg *nats.Msg) executor.ReplyFunc {
15-
return func(r *executor.Reply) {
16-
fields := log.Fields{
17-
"Subject": m.Subject,
18-
"URL": m.URL,
19-
"Method": m.Method,
20-
}
21-
22-
if r.Error != "" {
23-
log.WithFields(fields).Errorf("error while running script: %s", r.Error)
24-
}
19+
func replyMessage(nc *nats.Conn, msg *executor.Message, replySubject string, rep *Reply) error {
20+
fields := log.Fields{
21+
"Subject": msg.Subject,
22+
"URL": msg.URL,
23+
"Method": msg.Method,
24+
}
2525

26-
// Send a reply if the message has a reply subject
27-
if msg.Reply == "" {
28-
return
29-
}
26+
// Send a reply if the message has a reply subject
27+
if replySubject == "" {
28+
return nil
29+
}
3030

31-
var reply []byte
32-
var err error
33-
if !m.Raw {
34-
reply, err = r.JSON()
35-
if err != nil {
36-
log.WithFields(fields).Errorf("failed to serialize script reply to JSON: %v", err)
37-
}
38-
} else {
39-
if r.Error != "" {
40-
reply = []byte(r.Error)
41-
} else {
42-
reply = r.Bytes()
43-
}
44-
}
31+
var payload []byte
32+
var err error
33+
payload, err = json.Marshal(rep)
34+
if err != nil {
35+
log.WithFields(fields).Errorf("failed to serialize script reply to JSON: %v", err)
36+
return fmt.Errorf("failed to serialize script reply to JSON: %v", err)
37+
}
4538

46-
log.WithFields(fields).Debugf("sent reply: %s", reply)
47-
err = rep.nc.Publish(msg.Reply, reply)
48-
if err != nil {
49-
log.WithFields(fields).Errorf("failed to publish reply after running script: %v", err)
50-
}
39+
log.WithFields(fields).Debugf("sent reply: %s", string(payload))
40+
err = nc.Publish(replySubject, payload)
41+
if err != nil {
42+
log.WithFields(fields).Errorf("failed to publish reply after running script: %v", err)
5143
}
52-
}
5344

54-
func (re *replier) AsyncReply(m *executor.Message, msg *nats.Msg) executor.ReplyFunc {
55-
return func(r *executor.Reply) {
56-
fields := log.Fields{
57-
"Subject": m.Subject,
58-
"URL": m.URL,
59-
"Method": m.Method,
60-
}
45+
return nil
46+
}
6147

62-
if r.Error != "" {
63-
log.WithFields(fields).Errorf("error while running script: %s", r.Error)
64-
}
48+
func replyWithError(nc *nats.Conn, resErr error, replySubject string) {
49+
payload, err := json.Marshal(&Reply{Error: resErr.Error()})
50+
if err != nil {
51+
log.Errorf("failed to serialize script reply to JSON: %v", err)
52+
}
6553

66-
log.WithFields(fields).Debug("async reply received")
54+
err = nc.Publish(replySubject, payload)
55+
if err != nil {
56+
log.Errorf("failed to reply with error: %v", err)
6757
}
6858
}

0 commit comments

Comments
 (0)