Skip to content

Commit

Permalink
Send enqueued chunks when in closing state
Browse files Browse the repository at this point in the history
Addresses a regression introduced by pion#245
  • Loading branch information
edaniels committed Dec 12, 2022
1 parent 6e962c6 commit aa10939
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 14 deletions.
21 changes: 10 additions & 11 deletions association.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,13 @@ func getAssociationStateString(a uint32) string {
//
// Tag :
// State : A state variable indicating what state the association
// : is in, i.e., COOKIE-WAIT, COOKIE-ECHOED, ESTABLISHED,
// : SHUTDOWN-PENDING, SHUTDOWN-SENT, SHUTDOWN-RECEIVED,
// : SHUTDOWN-ACK-SENT.
//
// Note: No "CLOSED" state is illustrated since if a
// association is "CLOSED" its TCB SHOULD be removed.
// : is in, i.e., COOKIE-WAIT, COOKIE-ECHOED, ESTABLISHED,
// : SHUTDOWN-PENDING, SHUTDOWN-SENT, SHUTDOWN-RECEIVED,
// : SHUTDOWN-ACK-SENT.
//
// Note: No "CLOSED" state is illustrated since if a
// association is "CLOSED" its TCB SHOULD be removed.
type Association struct {
bytesReceived uint64
bytesSent uint64
Expand Down Expand Up @@ -221,9 +222,8 @@ type Association struct {
delayedAckTriggered bool
immediateAckTriggered bool

name string
log logging.LeveledLogger
streamVersion uint32
name string
log logging.LeveledLogger
}

// Config collects the arguments to createAssociation construction into
Expand Down Expand Up @@ -1369,7 +1369,6 @@ func (a *Association) createStream(streamIdentifier uint16, accept bool) *Stream
streamIdentifier: streamIdentifier,
reassemblyQueue: newReassemblyQueue(streamIdentifier),
log: a.log,
version: atomic.AddUint32(&a.streamVersion, 1),
name: fmt.Sprintf("%d:%s", streamIdentifier, a.name),
}

Expand Down Expand Up @@ -2070,7 +2069,7 @@ func (a *Association) movePendingDataChunkToInflightQueue(c *chunkPayloadData) {
// The caller should hold the lock.
func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint16) {
chunks := []*chunkPayloadData{}
var sisToReset []uint16 // stream identifieres to reset
var sisToReset []uint16 // stream identifiers to reset

if a.pendingQueue.size() > 0 {
// RFC 4960 sec 6.1. Transmission of DATA Chunks
Expand All @@ -2096,7 +2095,7 @@ func (a *Association) popPendingDataChunksToSend() ([]*chunkPayloadData, []uint1

s, ok := a.streams[c.streamIdentifier]

if !ok || s.State() > StreamStateOpen || s.version != c.streamVersion {
if !ok || s.State() == StreamStateClosed {
a.popPendingDataChunksToDrop(c)
continue
}
Expand Down
4 changes: 1 addition & 3 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ type Stream struct {
state StreamState
log logging.LeveledLogger
name string
version uint32
}

// StreamIdentifier returns the Stream identifier associated to the stream.
Expand Down Expand Up @@ -297,7 +296,6 @@ func (s *Stream) packetize(raw []byte, ppi PayloadProtocolIdentifier) []*chunkPa
copy(userData, raw[i:i+fragmentSize])

chunk := &chunkPayloadData{
streamVersion: s.version,
streamIdentifier: s.streamIdentifier,
userData: userData,
unordered: unordered,
Expand Down Expand Up @@ -345,7 +343,7 @@ func (s *Stream) Close() error {

switch state {
case StreamStateOpen:
s.SetState(StreamStateClosed)
s.SetState(StreamStateClosing)
s.log.Debugf("[%s] state change: open => closed", s.name)
s.readErr = io.EOF
s.readNotifier.Broadcast()
Expand Down

0 comments on commit aa10939

Please sign in to comment.