Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dont log errors when the transfer event queue is full #1291

Merged
merged 1 commit into from
Mar 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 30 additions & 26 deletions transport/httptransport/http_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (h *httpTransport) Execute(ctx context.Context, transportInfo []byte, dealI

cleanupFns := []func(){
cancel,
func() { close(t.eventCh) },
func() { t.closeEventChannel(tctx) },
}
cleanup := func() {
for _, fn := range cleanupFns {
Expand Down Expand Up @@ -178,12 +178,7 @@ func (h *httpTransport) Execute(ctx context.Context, transportInfo []byte, dealI
if fileSize == dealInfo.DealSize {
defer cleanup()

if err := t.emitEvent(tctx, types.TransportEvent{
NBytesReceived: fileSize,
}, dealInfo.DealUuid); err != nil {
return nil, fmt.Errorf("failed to publish transfer completion event, id: %s, err: %w", t.dealInfo.DealUuid, err)
}

t.emitEvent(types.TransportEvent{NBytesReceived: fileSize})
h.dl.Infow(duuid, "file size is already equal to deal size, returning")
return t, nil
}
Expand All @@ -195,11 +190,7 @@ func (h *httpTransport) Execute(ctx context.Context, transportInfo []byte, dealI
defer cleanup()

if err := t.execute(tctx); err != nil {
if err := t.emitEvent(tctx, types.TransportEvent{
Error: err,
}, dealInfo.DealUuid); err != nil {
t.dl.LogError(duuid, "failed to publish transport error", err)
}
t.emitEvent(types.TransportEvent{Error: err})
}
}()

Expand All @@ -212,6 +203,7 @@ type transfer struct {
cancel context.CancelFunc

eventCh chan types.TransportEvent
lastEvt *types.TransportEvent

tInfo *types.HttpRequest
dealInfo *types.TransportDealInfo
Expand All @@ -226,15 +218,6 @@ type transfer struct {
dl *logs.DealLogger
}

func (t *transfer) emitEvent(ctx context.Context, evt types.TransportEvent, id uuid.UUID) error {
select {
case t.eventCh <- evt:
return nil
default:
return fmt.Errorf("dropping event %+v as channel is full for deal id %s", evt, id)
}
}

func (t *transfer) execute(ctx context.Context) error {
duuid := t.dealInfo.DealUuid
for {
Expand Down Expand Up @@ -387,11 +370,7 @@ func (t *transfer) doHttp(ctx context.Context, req *http.Request, dst io.Writer,
t.nBytesReceived = t.nBytesReceived + int64(nw)

// emit event updating the number of bytes received
if err := t.emitEvent(ctx, types.TransportEvent{
NBytesReceived: t.nBytesReceived,
}, t.dealInfo.DealUuid); err != nil {
t.dl.LogError(duid, "failed to publish transport event", err)
}
t.emitEvent(types.TransportEvent{NBytesReceived: t.nBytesReceived})
}
// the http stream we're reading from has sent us an EOF, nothing to do here.
if readErr == io.EOF {
Expand Down Expand Up @@ -420,3 +399,28 @@ func (t *transfer) Close() {
func (t *transfer) Sub() chan types.TransportEvent {
return t.eventCh
}

func (t *transfer) emitEvent(evt types.TransportEvent) {
t.lastEvt = nil
select {
case t.eventCh <- evt:
default:
// If it wasn't possible to send the event because the channel is full,
// save it so that we can ensure it gets sent before the channel is closed.
// A new event always supersedes an older event, so if there is another
// event after this one, it will simply over-write this one.
t.lastEvt = &evt
}
}

func (t *transfer) closeEventChannel(ctx context.Context) {
// If there was an event that wasn't sent because the channel was full,
// ensure that it gets sent before close
if t.lastEvt != nil {
select {
case <-ctx.Done():
case t.eventCh <- *t.lastEvt:
}
}
close(t.eventCh)
}