Skip to content

Commit

Permalink
std: make client state complete atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
pulsejet committed Jan 27, 2025
1 parent 9f993a8 commit e8b11e5
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
10 changes: 5 additions & 5 deletions std/object/client_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package object

import (
"fmt"
"sync/atomic"
"time"

enc "github.com/named-data/ndnd/std/encoding"
Expand All @@ -23,7 +24,7 @@ type ConsumeState struct {
// raw data contents.
content enc.Wire
// fetching is completed
complete bool
complete atomic.Bool
// fetched metadata
meta *rdr.MetaData
// versioned object name
Expand Down Expand Up @@ -56,7 +57,7 @@ func (a *ConsumeState) Error() error {

// returns true if the content has been completely fetched
func (a *ConsumeState) IsComplete() bool {
return a.complete
return a.complete.Load()
}

// returns the currently available buffer in the content
Expand Down Expand Up @@ -87,9 +88,8 @@ func (a *ConsumeState) ProgressMax() int {

// send a fatal error to the callback
func (a *ConsumeState) finalizeError(err error) {
if !a.complete {
if !a.complete.Swap(true) {
a.err = err
a.complete = true
a.args.Callback(a)
}
}
Expand All @@ -110,7 +110,7 @@ func (c *Client) ConsumeExt(args ndn.ConsumeExtArgs) {
args: args,
err: nil,
content: make(enc.Wire, 0), // just in case
complete: false,
complete: atomic.Bool{},
meta: nil,
fetchName: args.Name,
wnd: [3]int{0, 0},
Expand Down
8 changes: 4 additions & 4 deletions std/object/client_consume_seg.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *rrSegFetcher) findWork() *ConsumeState {
return nil // nothing to do here
}

if check.complete {
if check.complete.Load() {
// lazy remove completed streams
s.remove(check)

Expand Down Expand Up @@ -153,13 +153,13 @@ func (s *rrSegFetcher) check() {

// handleData is called when a data packet is received.
// It is necessary that this function be called only from one goroutine - the engine.
// The notable exception here is when there is a timeout, which has a separate goroutine.
func (s *rrSegFetcher) handleData(args ndn.ExpressCallbackArgs, state *ConsumeState) {
s.mutex.Lock()
s.outstanding--
isComplete := state.complete
s.mutex.Unlock()

if isComplete {
if state.IsComplete() {
return
}

Expand Down Expand Up @@ -239,8 +239,8 @@ func (s *rrSegFetcher) handleValidatedData(args ndn.ExpressCallbackArgs, state *
if state.wnd[1] == state.segCnt {
log.Debug(s, "Stream completed successfully", "name", state.fetchName)

state.complete.Store(true)
s.mutex.Lock()
state.complete = true
s.remove(state)
s.mutex.Unlock()
}
Expand Down

0 comments on commit e8b11e5

Please sign in to comment.