Skip to content

Commit

Permalink
telemetry: Handle receiving acks over websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
anacrolix committed Jun 30, 2024
1 parent cd912c6 commit 0545cb7
Showing 1 changed file with 50 additions and 19 deletions.
69 changes: 50 additions & 19 deletions telemetry/telemetry-writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,33 +91,63 @@ func (me *Writer) websocket() (wait bool) {
return true
}
defer func() {
err := me.ctx.Err()
err := context.Cause(me.ctx)
reason := me.closeReason
if err != nil {
reason = err.Error()
}
conn.Close(websocket.StatusNormalClosure, reason)
}()
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithCancel(me.ctx)
go func() {
defer wg.Done()
me.payloadWriter(func(b []byte) error {
err := conn.Write(me.ctx, websocket.MessageText, b)
me.Logger.Levelf(log.Debug, "wrote %q: %v", b, err)
return err
})
err := me.payloadWriter(
ctx,
func(b []byte) error {
err := conn.Write(ctx, websocket.MessageText, b)
me.Logger.Levelf(log.Debug, "wrote %q: %v", b, err)
return err
},
)
if err != nil {
me.Logger.Levelf(log.Error, "payload writer failed: %v", err)
}
// Notify that we're not sending anymore.
err = conn.Write(ctx, websocket.MessageBinary, nil)
if err != nil {
me.Logger.Levelf(log.Error, "writing end of stream: %v", err)
}
}()
wg.Wait()
err = me.websocketReader(me.ctx, conn)
// Since we can't receive acks anymore, stop sending immediately.
cancel()
me.Logger.Levelf(log.Error, "reading from websocket: %v", err)
return false
}

func (me *Writer) websocketReader(ctx context.Context, conn *websocket.Conn) error {
for {
_, data, err := conn.Read(ctx)
if err != nil {
return err
}
me.Logger.Levelf(log.Debug, "read from telemetry websocket: %q", string(data))
}
}

func (me *Writer) streamPost() {
ctx, cancel := context.WithCancel(me.ctx)
defer cancel()
r, w := io.Pipe()
go me.payloadWriter(func(b []byte) error {
_, err := w.Write(b)
return err
})
go func() {
defer w.Close()
err := me.payloadWriter(ctx, func(b []byte) error {
_, err := w.Write(b)
return err
})
if err != nil {
me.Logger.Levelf(log.Error, "http post payload writer failed: %v", err)
}
}()
me.Logger.Levelf(log.Debug, "starting post")
// What's the content type for newline/ND/packed JSON streams?
resp, err := http.Post(me.Url.String(), "application/jsonl", r)
Expand All @@ -133,22 +163,23 @@ func (me *Writer) streamPost() {
resp.Body.Close()
}

func (me *Writer) payloadWriter(w func(b []byte) error) {
func (me *Writer) payloadWriter(ctx context.Context, w func(b []byte) error) error {
for {
select {
case b, ok := <-me.buf:
if !ok {
return
me.Logger.Levelf(log.Debug, "buf closed")
return nil
}
me.Logger.Levelf(log.Debug, "writing %v byte payload", len(b))
err := w(b)
if err != nil {
me.Logger.Levelf(log.Debug, "error writing payload: %s", err)
me.retry = append(me.retry, b)
return
return err
}
case <-me.ctx.Done():
return
case <-ctx.Done():
return context.Cause(me.ctx)
}
}
}
Expand Down

0 comments on commit 0545cb7

Please sign in to comment.