Skip to content

Commit

Permalink
Sync Latest Internal Changes (#33)
Browse files Browse the repository at this point in the history
Syncs the latest internal Windows OS changes.
  • Loading branch information
nibanks authored Jan 9, 2020
1 parent 5319831 commit c489fe2
Show file tree
Hide file tree
Showing 18 changed files with 393 additions and 150 deletions.
15 changes: 10 additions & 5 deletions core/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -996,7 +996,7 @@ QuicConnTimerExpired(
// We don't want to actually call the flush immediate above as it can
// cause a new timer to be inserted, messing up timer loop.
//
(void)QuicSendProcessFlushSendOperation(&Connection->Send, TRUE);
(void)QuicSendFlush(&Connection->Send);
}
}

Expand Down Expand Up @@ -5080,10 +5080,15 @@ QuicConnDrainOperations(
break;

case QUIC_OPER_TYPE_FLUSH_SEND:
if (QuicSendProcessFlushSendOperation(&Connection->Send, FALSE)) {
if (QuicSendFlush(&Connection->Send)) {
//
// Still have more packets to send. Put the operation back
// on the queue.
// We have no more data to send out so clear the pending flag.
//
Connection->Send.FlushOperationPending = FALSE;
} else {
//
// Still have more data to send. Put the operation back on the
// queue.
//
FreeOper = FALSE;
(void)QuicOperationEnqueue(&Connection->OperQ, Oper);
Expand Down Expand Up @@ -5124,7 +5129,7 @@ QuicConnDrainOperations(
// immediate ACK. So as to not introduce additional queuing delay do one
// immediate flush now.
//
QuicSendProcessFlushSendOperation(&Connection->Send, TRUE);
(void)QuicSendFlush(&Connection->Send);
}

if (Connection->State.SendShutdownCompleteNotif && !Connection->State.HandleClosed) {
Expand Down
73 changes: 20 additions & 53 deletions core/send.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,7 @@ QuicSendQueueFlush(
QUIC_CONNECTION* Connection = QuicSendGetConnection(Send);
if ((Oper = QuicOperationAlloc(Connection->Worker, QUIC_OPER_TYPE_FLUSH_SEND)) != NULL) {
Send->FlushOperationPending = TRUE;
const char* ReasonStrings[] = {
"Flags",
"Stream",
"Probe",
"Loss",
"ACK",
"TP",
"CC",
"FC",
"NewKey",
"StreamFC",
"StreamID",
"AmpProtect"
};
QuicTraceLogConnVerbose(QueueSendFlush, Connection, "Queuing send flush (%s)", ReasonStrings[Reason]);
QuicTraceEvent(ConnQueueSendFlush, Connection, Reason);
QuicConnQueueOper(Connection, Oper);
}
}
Expand Down Expand Up @@ -862,27 +848,27 @@ typedef enum QUIC_SEND_RESULT {

} QUIC_SEND_RESULT;

//
// Sends items from the output queue.
//
_IRQL_requires_max_(PASSIVE_LEVEL)
QUIC_SEND_RESULT
BOOLEAN
QuicSendFlush(
_In_ QUIC_SEND* Send
)
{
QUIC_CONNECTION* Connection = QuicSendGetConnection(Send);

QUIC_DBG_ASSERT(!Connection->State.HandleClosed);

QuicConnTimerCancel(Connection, QUIC_CONN_TIMER_PACING);
QuicConnRemoveOutFlowBlockedReason(
Connection, QUIC_FLOW_BLOCKED_SCHEDULING);
Connection, QUIC_FLOW_BLOCKED_SCHEDULING | QUIC_FLOW_BLOCKED_PACING);

if (Send->SendFlags == 0 && QuicListIsEmpty(&Send->SendStreams)) {
return QUIC_SEND_COMPLETE;
return TRUE;
}

QUIC_PATH* Path = &Connection->Paths[0];
if (Path->DestCid == NULL) {
return QUIC_SEND_COMPLETE;
return TRUE;
}

QUIC_DBG_ASSERT(QuicSendCanSendFlagsNow(Send));
Expand All @@ -903,7 +889,7 @@ QuicSendFlush(
// uninitialized) state, so just ignore the send flush call. This can
// happen if a loss detection fires right after shutdown.
//
return QUIC_SEND_COMPLETE;
return TRUE;
}
_Analysis_assume_(Builder.Metadata != NULL);

Expand Down Expand Up @@ -934,6 +920,8 @@ QuicSendFlush(
// The current pacing chunk is finished. We need to schedule a
// new pacing send.
//
QuicConnAddOutFlowBlockedReason(
Connection, QUIC_FLOW_BLOCKED_PACING);
QuicTraceLogConnVerbose(SetPacingTimer, Connection, "Setting delayed send (PACING) timer for %u ms",
QUIC_SEND_PACING_INTERVAL);
QuicConnTimerSet(
Expand Down Expand Up @@ -1040,45 +1028,24 @@ QuicSendFlush(
} while (Builder.SendContext != NULL ||
Builder.TotalCountDatagrams < QUIC_MAX_DATAGRAMS_PER_SEND);

if (Result == QUIC_SEND_INCOMPLETE &&
Builder.TotalCountDatagrams >= QUIC_MAX_DATAGRAMS_PER_SEND) {
//
// The send is limited by the scheduling logic.
//
QuicConnAddOutFlowBlockedReason(
Connection, QUIC_FLOW_BLOCKED_SCHEDULING);
}

QuicPacketBuilderCleanup(&Builder);

QuicTraceLogConnVerbose(SendFlushComplete, Connection, "Flush complete flags=0x%x", Send->SendFlags);

return Result;
}

_IRQL_requires_max_(PASSIVE_LEVEL)
BOOLEAN
QuicSendProcessFlushSendOperation(
_In_ QUIC_SEND* Send,
_In_ BOOLEAN Immediate
)
{
QUIC_CONNECTION* Connection = QuicSendGetConnection(Send);

QUIC_DBG_ASSERT(!Connection->State.HandleClosed);

QuicConnTimerCancel(Connection, QUIC_CONN_TIMER_PACING);
QUIC_SEND_RESULT SendResult = QuicSendFlush(Send);
if (Result == QUIC_SEND_INCOMPLETE) {
//
// The send is limited by the scheduling logic.
//
QuicConnAddOutFlowBlockedReason(Connection, QUIC_FLOW_BLOCKED_SCHEDULING);

if (!Immediate && SendResult != QUIC_SEND_INCOMPLETE) {
//
// We have no more data to immediately send out so clear the pending
// flag.
// We have more data to send so we need to make sure a flush send
// operation is queued to send the rest.
//
Send->FlushOperationPending = FALSE;
QuicSendQueueFlush(&Connection->Send, REASON_SCHEDULING);
}

return SendResult == QUIC_SEND_INCOMPLETE;
return Result != QUIC_SEND_INCOMPLETE;
}

_IRQL_requires_max_(DISPATCH_LEVEL)
Expand Down
12 changes: 6 additions & 6 deletions core/send.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ typedef enum QUIC_SEND_FLUSH_REASON {
REASON_NEW_KEY,
REASON_STREAM_FLOW_CONTROL,
REASON_STREAM_ID_FLOW_CONTROL,
REASON_AMP_PROTECTION
REASON_AMP_PROTECTION,
REASON_SCHEDULING
} QUIC_SEND_FLUSH_REASON;

//
Expand All @@ -295,14 +296,13 @@ QuicSendQueueFlush(
);

//
// Called in response to FLUSH_SEND operation. Drains all queued data that
// needs to be sent.
// Tries to drain all queued data that needs to be sent. Returns TRUE if all the
// data was drained.
//
_IRQL_requires_max_(PASSIVE_LEVEL)
BOOLEAN
QuicSendProcessFlushSendOperation(
_In_ QUIC_SEND* Send,
_In_ BOOLEAN Immediate
QuicSendFlush(
_In_ QUIC_SEND* Send
);

//
Expand Down
96 changes: 51 additions & 45 deletions core/stream_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -782,9 +782,9 @@ QuicStreamWriteStreamFrames(
PacketMetadata->FrameCount < QUIC_MAX_FRAMES_PER_PACKET) {

//
// Find the bounds of this frame. Left is the offset of the
// first byte in the frame, and Right is the offset of the
// first byte AFTER the frame.
// Find the bounds of this frame. Left is the offset of the first byte
// in the frame, and Right is the offset of the first byte AFTER the
// frame.
//
uint64_t Left;
uint64_t Right;
Expand Down Expand Up @@ -822,7 +822,7 @@ QuicStreamWriteStreamFrames(
}
}

if (Sack) {
if (Sack != NULL) {
if (Right > Sack->Low) {
Right = Sack->Low;
}
Expand All @@ -835,41 +835,23 @@ QuicStreamWriteStreamFrames(
//
// Stream flow control
//
if (Right >= Stream->MaxAllowedSendOffset) {
if (Right > Stream->MaxAllowedSendOffset) {
Right = Stream->MaxAllowedSendOffset;
QUIC_DBG_ASSERT(Right >= Left);
if (Right == Left) {
if (QuicStreamAddOutFlowBlockedReason(
Stream, QUIC_FLOW_BLOCKED_STREAM_FLOW_CONTROL)) {
QuicSendSetStreamSendFlag(
&Stream->Connection->Send,
Stream, QUIC_STREAM_SEND_FLAG_DATA_BLOCKED);
}
QUIC_DBG_ASSERT(BytesWritten != 0);
break;
}
}

//
// Connection flow control
//
if (Right >= Send->PeerMaxData - Send->OrderedStreamBytesSent + Stream->MaxSentLength) {
Right = Send->PeerMaxData - Send->OrderedStreamBytesSent + Stream->MaxSentLength;
QUIC_DBG_ASSERT(Right >= Left);
if (Right == Left) {
if (QuicConnAddOutFlowBlockedReason(
Stream->Connection,
QUIC_FLOW_BLOCKED_CONN_FLOW_CONTROL)) {
QuicSendSetSendFlag(
&Stream->Connection->Send,
QUIC_CONN_SEND_FLAG_DATA_BLOCKED);
}
QUIC_DBG_ASSERT(BytesWritten != 0);
break;
}
const uint64_t MaxConnFlowControlOffset =
Stream->MaxSentLength + (Send->PeerMaxData - Send->OrderedStreamBytesSent);
if (Right > MaxConnFlowControlOffset) {
Right = MaxConnFlowControlOffset;
}

QUIC_DBG_ASSERT(Right <= Stream->MaxAllowedSendOffset);
//
// It's OK for Right and Left to be equal because there are cases where
// stream frames will be written with no payload (initial or FIN).
//
QUIC_DBG_ASSERT(Right >= Left);

uint16_t FrameBytes = *BufferLength - BytesWritten;
Expand All @@ -884,33 +866,56 @@ QuicStreamWriteStreamFrames(
Buffer + BytesWritten,
PacketMetadata);

BOOLEAN ExitLoop = FALSE;

//
// When FramePayloadBytes is returned as zero, an empty FIN frame
// may or may not have been written (i.e. we may have
// FramePayloadBytes == 0 but FrameBytes != 0).
// When FramePayloadBytes is returned as zero, an empty stream frame may
// still have been written (i.e. FramePayloadBytes might be 0 but
// FrameBytes is not).
//
BytesWritten += FrameBytes;
if (FramePayloadBytes == 0) {
break;
ExitLoop = TRUE;
}

//
// FramePayloadBytes may have been reduced.
// Recalculate Right since FramePayloadBytes may have been reduced.
//
Right = Left + FramePayloadBytes;

QUIC_DBG_ASSERT(Right <= Stream->QueuedSendOffset);
if (Right == Stream->QueuedSendOffset) {
QuicStreamAddOutFlowBlockedReason(
Stream, QUIC_FLOW_BLOCKED_APP);
QuicStreamAddOutFlowBlockedReason(Stream, QUIC_FLOW_BLOCKED_APP);
ExitLoop = TRUE;
}

QUIC_DBG_ASSERT(Right <= Stream->MaxAllowedSendOffset);
if (Right == Stream->MaxAllowedSendOffset) {
if (QuicStreamAddOutFlowBlockedReason(
Stream, QUIC_FLOW_BLOCKED_STREAM_FLOW_CONTROL)) {
QuicSendSetStreamSendFlag(
&Stream->Connection->Send,
Stream, QUIC_STREAM_SEND_FLAG_DATA_BLOCKED);
}
ExitLoop = TRUE;
}

QUIC_DBG_ASSERT(Right <= MaxConnFlowControlOffset);
if (Right == MaxConnFlowControlOffset) {
if (QuicConnAddOutFlowBlockedReason(
Stream->Connection, QUIC_FLOW_BLOCKED_CONN_FLOW_CONTROL)) {
QuicSendSetSendFlag(
&Stream->Connection->Send,
QUIC_CONN_SEND_FLAG_DATA_BLOCKED);
}
ExitLoop = TRUE;
}

//
// Move the "next" offset (RecoveryNextOffset if we are sending
// recovery bytes or NextSendOffset otherwise) forward by the
// number of bytes we've written. If we wrote up to the edge
// of a SACK, skip past the SACK.
// Move the "next" offset (RecoveryNextOffset if we are sending recovery
// bytes or NextSendOffset otherwise) forward by the number of bytes
// we've written. If we wrote up to the edge of a SACK, skip past the
// SACK.
//

if (Recovery) {
Expand All @@ -931,13 +936,14 @@ QuicStreamWriteStreamFrames(
if (Stream->MaxSentLength < Right) {
Send->OrderedStreamBytesSent += Right - Stream->MaxSentLength;
QUIC_DBG_ASSERT(Send->OrderedStreamBytesSent <= Send->PeerMaxData);
if (Send->OrderedStreamBytesSent == Send->PeerMaxData) {
QuicTraceLogConnVerbose(ConnFCBlocked, Stream->Connection, "Connection flow control limit reached");
}
Stream->MaxSentLength = Right;
}

QuicStreamValidateRecoveryState(Stream);

if (ExitLoop) {
break;
}
}

QuicStreamSendDumpState(Stream);
Expand Down
8 changes: 2 additions & 6 deletions core/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -423,14 +423,10 @@ QuicWorkerProcessConnection(
//
// Process some operations.
//
BOOLEAN StillHasWorkToDo = QuicConnDrainOperations(Connection);
BOOLEAN StillHasWorkToDo =
QuicConnDrainOperations(Connection) | Connection->State.UpdateWorker;
Connection->WorkerThreadID = 0;

//
// If UpdateWorker is TRUE, then StillHasWorkToDo should be TRUE as well.
//
QUIC_DBG_ASSERT(!Connection->State.UpdateWorker || StillHasWorkToDo);

//
// Determine whether the connection needs to be requeued.
//
Expand Down
Loading

0 comments on commit c489fe2

Please sign in to comment.