From e8b11e5ff43c5ed5f11867c66f0917fea287a314 Mon Sep 17 00:00:00 2001 From: Varun Patil Date: Mon, 27 Jan 2025 19:47:03 +0000 Subject: [PATCH] std: make client state complete atomic --- std/object/client_consume.go | 10 +++++----- std/object/client_consume_seg.go | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/std/object/client_consume.go b/std/object/client_consume.go index 25a7b864..471d91a8 100644 --- a/std/object/client_consume.go +++ b/std/object/client_consume.go @@ -2,6 +2,7 @@ package object import ( "fmt" + "sync/atomic" "time" enc "github.com/named-data/ndnd/std/encoding" @@ -23,7 +24,7 @@ type ConsumeState struct { // raw data contents. content enc.Wire // fetching is completed - complete bool + complete atomic.Bool // fetched metadata meta *rdr.MetaData // versioned object name @@ -56,7 +57,7 @@ func (a *ConsumeState) Error() error { // returns true if the content has been completely fetched func (a *ConsumeState) IsComplete() bool { - return a.complete + return a.complete.Load() } // returns the currently available buffer in the content @@ -87,9 +88,8 @@ func (a *ConsumeState) ProgressMax() int { // send a fatal error to the callback func (a *ConsumeState) finalizeError(err error) { - if !a.complete { + if !a.complete.Swap(true) { a.err = err - a.complete = true a.args.Callback(a) } } @@ -110,7 +110,7 @@ func (c *Client) ConsumeExt(args ndn.ConsumeExtArgs) { args: args, err: nil, content: make(enc.Wire, 0), // just in case - complete: false, + complete: atomic.Bool{}, meta: nil, fetchName: args.Name, wnd: [3]int{0, 0}, diff --git a/std/object/client_consume_seg.go b/std/object/client_consume_seg.go index 919e8666..82f64408 100644 --- a/std/object/client_consume_seg.go +++ b/std/object/client_consume_seg.go @@ -88,7 +88,7 @@ func (s *rrSegFetcher) findWork() *ConsumeState { return nil // nothing to do here } - if check.complete { + if check.complete.Load() { // lazy remove completed streams s.remove(check) @@ -153,13 +153,13 @@ func (s *rrSegFetcher) check() { // handleData is called when a data packet is received. // It is necessary that this function be called only from one goroutine - the engine. +// The notable exception here is when there is a timeout, which has a separate goroutine. func (s *rrSegFetcher) handleData(args ndn.ExpressCallbackArgs, state *ConsumeState) { s.mutex.Lock() s.outstanding-- - isComplete := state.complete s.mutex.Unlock() - if isComplete { + if state.IsComplete() { return } @@ -239,8 +239,8 @@ func (s *rrSegFetcher) handleValidatedData(args ndn.ExpressCallbackArgs, state * if state.wnd[1] == state.segCnt { log.Debug(s, "Stream completed successfully", "name", state.fetchName) + state.complete.Store(true) s.mutex.Lock() - state.complete = true s.remove(state) s.mutex.Unlock() }