Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly Handle FIN after Stream Reset #2049

Merged
merged 4 commits into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/core/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ MsQuicStreamShutdown(
}

if (Flags & QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL &&
Flags != QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL) {
Flags & (QUIC_STREAM_SHUTDOWN_FLAG_ABORT | QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE)) {
//
// Not allowed to use the graceful shutdown flag with any other flag.
//
Expand Down Expand Up @@ -953,6 +953,27 @@ MsQuicStreamShutdown(
QUIC_CONN_VERIFY(Connection, !Connection->State.Freed);
QUIC_CONN_VERIFY(Connection, !Connection->State.HandleClosed);

if (Flags & QUIC_STREAM_SHUTDOWN_FLAG_INLINE &&
Connection->WorkerThreadID == CxPlatCurThreadID()) {

CXPLAT_PASSIVE_CODE();

//
// Execute this blocking API call inline if called on the worker thread.
//
BOOLEAN AlreadyInline = Connection->State.InlineApiExecution;
if (!AlreadyInline) {
Connection->State.InlineApiExecution = TRUE;
}
QuicStreamShutdown(Stream, Flags, ErrorCode);
if (!AlreadyInline) {
Connection->State.InlineApiExecution = FALSE;
}

Status = QUIC_STATUS_SUCCESS;
goto Error;
}

Oper = QuicOperationAlloc(Connection->Worker, QUIC_OPER_TYPE_API_CALL);
if (Oper == NULL) {
Status = QUIC_STATUS_OUT_OF_MEMORY;
Expand Down
4 changes: 2 additions & 2 deletions src/core/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ QuicStreamShutdown(
{
CXPLAT_DBG_ASSERT(Flags != 0 && Flags != QUIC_STREAM_SHUTDOWN_SILENT);
CXPLAT_DBG_ASSERT(
Flags == QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL ||
!(Flags & QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL));
!(Flags & QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL) ||
!(Flags & (QUIC_STREAM_SHUTDOWN_FLAG_ABORT | QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE)));
CXPLAT_DBG_ASSERT(
!(Flags & QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE) ||
Flags == (QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE |
Expand Down
32 changes: 26 additions & 6 deletions src/core/stream_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ QuicStreamProcessResetFrame(
"Tried to reset at earlier final size!");
QuicConnTransportError(Stream->Connection, QUIC_ERROR_FINAL_SIZE_ERROR);
return;

}

if (TotalRecvLength < FinalSize) {
Expand Down Expand Up @@ -311,12 +310,10 @@ QuicStreamProcessStreamFrame(
goto Error;
}

if (Stream->Flags.RemoteCloseFin ||
Stream->Flags.RemoteCloseReset ||
Stream->Flags.SentStopSending) {
if (Stream->Flags.RemoteCloseFin || Stream->Flags.RemoteCloseReset) {
//
// Ignore the data if we are already closed remotely. Likely means we received
// a copy of already processed data that was resent.
// Ignore the data if we are already closed remotely. Likely means we
// received a copy of already processed data that was resent.
//
QuicTraceLogStreamVerbose(
IgnoreRecvAfterClose,
Expand All @@ -326,6 +323,29 @@ QuicStreamProcessStreamFrame(
goto Error;
}

if (Stream->Flags.SentStopSending) {
//
// The app has already aborting the receive path, but the peer might end
// up sending a FIN instead of a reset. Ignore the data but treat any
// FIN as a reset.
//
if (Frame->Fin) {
QuicTraceLogStreamInfo(
TreatFinAsReset,
Stream,
"Treating FIN after receive abort as reset");
QuicStreamProcessResetFrame(Stream, Frame->Offset + Frame->Length, 0);

} else {
QuicTraceLogStreamVerbose(
IgnoreRecvAfterAbort,
Stream,
"Ignoring received frame after receive abort");
}
Status = QUIC_STATUS_SUCCESS;
goto Error;
}

if (Frame->Fin && Stream->RecvMaxLength != UINT64_MAX &&
EndOffset != Stream->RecvMaxLength) {
//
Expand Down
42 changes: 42 additions & 0 deletions src/generated/linux/stream_recv.c.clog.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,27 @@ tracepoint(CLOG_STREAM_RECV_C, LocalCloseStopSending , arg1);\



#ifndef _clog_3_ARGS_TRACE_TreatFinAsReset



/*----------------------------------------------------------
// Decoder Ring for TreatFinAsReset
// [strm][%p] Treating FIN after receive abort as reset
// QuicTraceLogStreamInfo(
TreatFinAsReset,
Stream,
"Treating FIN after receive abort as reset");
// arg1 = arg1 = Stream
----------------------------------------------------------*/
#define _clog_3_ARGS_TRACE_TreatFinAsReset(uniqueId, arg1, encoded_arg_string)\
tracepoint(CLOG_STREAM_RECV_C, TreatFinAsReset , arg1);\

#endif




#ifndef _clog_3_ARGS_TRACE_QueueRecvFlush


Expand Down Expand Up @@ -245,6 +266,27 @@ tracepoint(CLOG_STREAM_RECV_C, IgnoreRecvAfterClose , arg1);\



#ifndef _clog_3_ARGS_TRACE_IgnoreRecvAfterAbort



/*----------------------------------------------------------
// Decoder Ring for IgnoreRecvAfterAbort
// [strm][%p] Ignoring received frame after receive abort
// QuicTraceLogStreamVerbose(
IgnoreRecvAfterAbort,
Stream,
"Ignoring received frame after receive abort");
// arg1 = arg1 = Stream
----------------------------------------------------------*/
#define _clog_3_ARGS_TRACE_IgnoreRecvAfterAbort(uniqueId, arg1, encoded_arg_string)\
tracepoint(CLOG_STREAM_RECV_C, IgnoreRecvAfterAbort , arg1);\

#endif




#ifndef _clog_3_ARGS_TRACE_FlowControlExhausted


Expand Down
38 changes: 38 additions & 0 deletions src/generated/linux/stream_recv.c.clog.h.lttng.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,25 @@ TRACEPOINT_EVENT(CLOG_STREAM_RECV_C, LocalCloseStopSending,



/*----------------------------------------------------------
// Decoder Ring for TreatFinAsReset
// [strm][%p] Treating FIN after receive abort as reset
// QuicTraceLogStreamInfo(
TreatFinAsReset,
Stream,
"Treating FIN after receive abort as reset");
// arg1 = arg1 = Stream
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_STREAM_RECV_C, TreatFinAsReset,
TP_ARGS(
const void *, arg1),
TP_FIELDS(
ctf_integer_hex(uint64_t, arg1, arg1)
)
)



/*----------------------------------------------------------
// Decoder Ring for QueueRecvFlush
// [strm][%p] Queuing recv flush
Expand Down Expand Up @@ -199,6 +218,25 @@ TRACEPOINT_EVENT(CLOG_STREAM_RECV_C, IgnoreRecvAfterClose,



/*----------------------------------------------------------
// Decoder Ring for IgnoreRecvAfterAbort
// [strm][%p] Ignoring received frame after receive abort
// QuicTraceLogStreamVerbose(
IgnoreRecvAfterAbort,
Stream,
"Ignoring received frame after receive abort");
// arg1 = arg1 = Stream
----------------------------------------------------------*/
TRACEPOINT_EVENT(CLOG_STREAM_RECV_C, IgnoreRecvAfterAbort,
TP_ARGS(
const void *, arg1),
TP_FIELDS(
ctf_integer_hex(uint64_t, arg1, arg1)
)
)



/*----------------------------------------------------------
// Decoder Ring for FlowControlExhausted
// [strm][%p] Flow control window exhausted!
Expand Down
1 change: 1 addition & 0 deletions src/inc/msquic.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ typedef enum QUIC_STREAM_SHUTDOWN_FLAGS {
QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE = 0x0004, // Abruptly closes the receive path.
QUIC_STREAM_SHUTDOWN_FLAG_ABORT = 0x0006, // Abruptly closes both send and receive paths.
QUIC_STREAM_SHUTDOWN_FLAG_IMMEDIATE = 0x0008, // Immediately sends completion events to app.
QUIC_STREAM_SHUTDOWN_FLAG_INLINE = 0x0010, // Process the shutdown immediately inline. Only for calls on callbacks.
} QUIC_STREAM_SHUTDOWN_FLAGS;

DEFINE_ENUM_FLAG_OPERATORS(QUIC_STREAM_SHUTDOWN_FLAGS)
Expand Down
32 changes: 32 additions & 0 deletions src/manifest/clog.sidecar
Original file line number Diff line number Diff line change
Expand Up @@ -7084,6 +7084,18 @@
],
"macroName": "QuicTraceLogStreamInfo"
},
"TreatFinAsReset": {
"ModuleProperites": {},
"TraceString": "[strm][%p] Treating FIN after receive abort as reset",
"UniqueId": "TreatFinAsReset",
"splitArgs": [
{
"DefinationEncoding": "p",
"MacroVariableName": "arg1"
}
],
"macroName": "QuicTraceLogStreamInfo"
},
"QueueRecvFlush": {
"ModuleProperites": {},
"TraceString": "[strm][%p] Queuing recv flush",
Expand Down Expand Up @@ -7140,6 +7152,18 @@
],
"macroName": "QuicTraceLogStreamVerbose"
},
"IgnoreRecvAfterAbort": {
"ModuleProperites": {},
"TraceString": "[strm][%p] Ignoring received frame after receive abort",
"UniqueId": "IgnoreRecvAfterAbort",
"splitArgs": [
{
"DefinationEncoding": "p",
"MacroVariableName": "arg1"
}
],
"macroName": "QuicTraceLogStreamVerbose"
},
"FlowControlExhausted": {
"ModuleProperites": {},
"TraceString": "[strm][%p] Flow control window exhausted!",
Expand Down Expand Up @@ -12520,6 +12544,10 @@
"UniquenessHash": "3d930671-e549-968b-fcba-6e7691ee9678",
"TraceID": "LocalCloseStopSending"
},
{
"UniquenessHash": "40c67c17-d530-a2d2-272a-4730ad121b34",
"TraceID": "TreatFinAsReset"
},
{
"UniquenessHash": "80596bbb-20e9-07e5-07f3-ebc7078fa612",
"TraceID": "QueueRecvFlush"
Expand All @@ -12536,6 +12564,10 @@
"UniquenessHash": "59f615c0-9861-8dc7-db5a-224ebc6336a5",
"TraceID": "IgnoreRecvAfterClose"
},
{
"UniquenessHash": "61c9329f-42bd-40dd-43cc-5b5c3fa2830b",
"TraceID": "IgnoreRecvAfterAbort"
},
{
"UniquenessHash": "337f803e-406e-0817-5804-7273bf3a07ec",
"TraceID": "FlowControlExhausted"
Expand Down
9 changes: 8 additions & 1 deletion src/test/MsQuicTests.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,10 @@ void
QuicTestStreamDifferentAbortErrors(
);

void
QuicTestStreamAbortRecvFinRace(
);

//
// QuicDrill tests
//
Expand Down Expand Up @@ -933,4 +937,7 @@ typedef struct {
#define IOCTL_QUIC_RUN_CONNECT_INVALID_ADDRESS \
QUIC_CTL_CODE(77, METHOD_BUFFERED, FILE_WRITE_DATA)

#define QUIC_MAX_IOCTL_FUNC_CODE 77
#define IOCTL_QUIC_RUN_STREAM_ABORT_RECV_FIN_RACE \
QUIC_CTL_CODE(78, METHOD_BUFFERED, FILE_WRITE_DATA)

#define QUIC_MAX_IOCTL_FUNC_CODE 78
9 changes: 9 additions & 0 deletions src/test/bin/quic_gtest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,15 @@ TEST(Misc, StreamDifferentAbortErrors) {
}
}

TEST(Misc, StreamAbortRecvFinRace) {
TestLogger Logger("StreamAbortRecvFinRace");
if (TestingKernelMode) {
ASSERT_TRUE(DriverClient.Run(IOCTL_QUIC_RUN_STREAM_ABORT_RECV_FIN_RACE));
} else {
QuicTestStreamAbortRecvFinRace();
}
}

TEST(Drill, VarIntEncoder) {
TestLogger Logger("QuicDrillTestVarIntEncoder");
if (TestingKernelMode) {
Expand Down
5 changes: 5 additions & 0 deletions src/test/bin/winkernel/control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ size_t QUIC_IOCTL_BUFFER_SIZES[] =
sizeof(UINT8),
sizeof(INT32),
0,
0,
};

CXPLAT_STATIC_ASSERT(
Expand Down Expand Up @@ -1115,6 +1116,10 @@ QuicTestCtlEvtIoDeviceControl(
QuicTestCtlRun(QuicTestConnectInvalidAddress());
break;

case IOCTL_QUIC_RUN_STREAM_ABORT_RECV_FIN_RACE:
QuicTestCtlRun(QuicTestStreamAbortRecvFinRace());
break;

default:
Status = STATUS_NOT_IMPLEMENTED;
break;
Expand Down
63 changes: 63 additions & 0 deletions src/test/lib/DataTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2481,3 +2481,66 @@ QuicTestStreamDifferentAbortErrors(
TEST_TRUE(Context.PeerRecvAbortErrorCode == RecvShutdownErrorCode);
TEST_TRUE(Context.PeerSendAbortErrorCode == SendShutdownErrorCode);
}

struct StreamAbortRecvFinRace {
CxPlatEvent ClientStreamShutdownComplete;

static QUIC_STATUS ClientStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void* Context, _Inout_ QUIC_STREAM_EVENT* Event) {
auto TestContext = (StreamAbortRecvFinRace*)Context;
if (Event->Type == QUIC_STREAM_EVENT_SEND_SHUTDOWN_COMPLETE) {
Stream->Shutdown(0, QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE | QUIC_STREAM_SHUTDOWN_FLAG_INLINE);
} else if (Event->Type == QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE) {
TestContext->ClientStreamShutdownComplete.Set();
}
return QUIC_STATUS_SUCCESS;
}

static QUIC_STATUS ServerStreamCallback(_In_ MsQuicStream* Stream, _In_opt_ void*, _Inout_ QUIC_STREAM_EVENT* Event) {
if (Event->Type == QUIC_STREAM_EVENT_PEER_SEND_SHUTDOWN) {
Stream->Shutdown(0, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL | QUIC_STREAM_SHUTDOWN_FLAG_INLINE);
}
return QUIC_STATUS_SUCCESS;
}

static QUIC_STATUS ConnCallback(_In_ MsQuicConnection*, _In_opt_ void* Context, _Inout_ QUIC_CONNECTION_EVENT* Event) {
if (Event->Type == QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED) {
new(std::nothrow) MsQuicStream(Event->PEER_STREAM_STARTED.Stream, CleanUpAutoDelete, ServerStreamCallback, Context);
}
return QUIC_STATUS_SUCCESS;
}
};

void
QuicTestStreamAbortRecvFinRace(
)
{
MsQuicRegistration Registration(true);
TEST_QUIC_SUCCEEDED(Registration.GetInitStatus());

MsQuicConfiguration ServerConfiguration(Registration, "MsQuicTest", MsQuicSettings().SetPeerBidiStreamCount(1), ServerSelfSignedCredConfig);
TEST_QUIC_SUCCEEDED(ServerConfiguration.GetInitStatus());

MsQuicConfiguration ClientConfiguration(Registration, "MsQuicTest", MsQuicCredentialConfig());
TEST_QUIC_SUCCEEDED(ClientConfiguration.GetInitStatus());

StreamAbortRecvFinRace Context;
MsQuicAutoAcceptListener Listener(Registration, ServerConfiguration, StreamAbortRecvFinRace::ConnCallback, &Context);
TEST_QUIC_SUCCEEDED(Listener.GetInitStatus());
TEST_QUIC_SUCCEEDED(Listener.Start("MsQuicTest"));
QuicAddr ServerLocalAddr;
TEST_QUIC_SUCCEEDED(Listener.GetLocalAddr(ServerLocalAddr));

MsQuicConnection Connection(Registration);
TEST_QUIC_SUCCEEDED(Connection.GetInitStatus());

MsQuicStream Stream(Connection, QUIC_STREAM_OPEN_FLAG_NONE, CleanUpManual, StreamAbortRecvFinRace::ClientStreamCallback, &Context);
TEST_QUIC_SUCCEEDED(Stream.GetInitStatus());
TEST_QUIC_SUCCEEDED(Stream.Start());
TEST_QUIC_SUCCEEDED(Stream.Shutdown(0, QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL));

TEST_QUIC_SUCCEEDED(Connection.StartLocalhost(ClientConfiguration, ServerLocalAddr));
TEST_TRUE(Connection.HandshakeCompleteEvent.WaitTimeout(TestWaitTimeout));
TEST_TRUE(Connection.HandshakeComplete);

TEST_TRUE(Context.ClientStreamShutdownComplete.WaitTimeout(TestWaitTimeout));
}