From 0545cb78bdd14850b1f562d6b16981697ed382dc Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 30 Jun 2024 12:59:51 +1000 Subject: [PATCH] telemetry: Handle receiving acks over websocket --- telemetry/telemetry-writer.go | 69 +++++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 19 deletions(-) diff --git a/telemetry/telemetry-writer.go b/telemetry/telemetry-writer.go index 039b94c..9617e48 100644 --- a/telemetry/telemetry-writer.go +++ b/telemetry/telemetry-writer.go @@ -91,33 +91,63 @@ func (me *Writer) websocket() (wait bool) { return true } defer func() { - err := me.ctx.Err() + err := context.Cause(me.ctx) reason := me.closeReason if err != nil { reason = err.Error() } conn.Close(websocket.StatusNormalClosure, reason) }() - var wg sync.WaitGroup - wg.Add(1) + ctx, cancel := context.WithCancel(me.ctx) go func() { - defer wg.Done() - me.payloadWriter(func(b []byte) error { - err := conn.Write(me.ctx, websocket.MessageText, b) - me.Logger.Levelf(log.Debug, "wrote %q: %v", b, err) - return err - }) + err := me.payloadWriter( + ctx, + func(b []byte) error { + err := conn.Write(ctx, websocket.MessageText, b) + me.Logger.Levelf(log.Debug, "wrote %q: %v", b, err) + return err + }, + ) + if err != nil { + me.Logger.Levelf(log.Error, "payload writer failed: %v", err) + } + // Notify that we're not sending anymore. + err = conn.Write(ctx, websocket.MessageBinary, nil) + if err != nil { + me.Logger.Levelf(log.Error, "writing end of stream: %v", err) + } }() - wg.Wait() + err = me.websocketReader(me.ctx, conn) + // Since we can't receive acks anymore, stop sending immediately. + cancel() + me.Logger.Levelf(log.Error, "reading from websocket: %v", err) return false } +func (me *Writer) websocketReader(ctx context.Context, conn *websocket.Conn) error { + for { + _, data, err := conn.Read(ctx) + if err != nil { + return err + } + me.Logger.Levelf(log.Debug, "read from telemetry websocket: %q", string(data)) + } +} + func (me *Writer) streamPost() { + ctx, cancel := context.WithCancel(me.ctx) + defer cancel() r, w := io.Pipe() - go me.payloadWriter(func(b []byte) error { - _, err := w.Write(b) - return err - }) + go func() { + defer w.Close() + err := me.payloadWriter(ctx, func(b []byte) error { + _, err := w.Write(b) + return err + }) + if err != nil { + me.Logger.Levelf(log.Error, "http post payload writer failed: %v", err) + } + }() me.Logger.Levelf(log.Debug, "starting post") // What's the content type for newline/ND/packed JSON streams? resp, err := http.Post(me.Url.String(), "application/jsonl", r) @@ -133,22 +163,23 @@ func (me *Writer) streamPost() { resp.Body.Close() } -func (me *Writer) payloadWriter(w func(b []byte) error) { +func (me *Writer) payloadWriter(ctx context.Context, w func(b []byte) error) error { for { select { case b, ok := <-me.buf: if !ok { - return + me.Logger.Levelf(log.Debug, "buf closed") + return nil } me.Logger.Levelf(log.Debug, "writing %v byte payload", len(b)) err := w(b) if err != nil { me.Logger.Levelf(log.Debug, "error writing payload: %s", err) me.retry = append(me.retry, b) - return + return err } - case <-me.ctx.Done(): - return + case <-ctx.Done(): + return context.Cause(me.ctx) } } }