Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a mode to the receive buffer to handle external buffers #4758

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api/StreamOpen.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Value | Meaning
**QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL**<br>1 | Opens a unidirectional stream.
**QUIC_STREAM_OPEN_FLAG_0_RTT**<br>2 | Indicates that the stream may be sent in 0-RTT.
**QUIC_STREAM_OPEN_FLAG_DELAY_ID_FC_UPDATES**<br>4 | Indicates stream ID flow control limit updates for the connection should be delayed to StreamClose.
**QUIC_STREAM_OPEN_FLAG_EXTERNAL_BUFFERS**<br>5 | No buffer will be allocated for the stream, external buffers must be provided using StreamProvideReceiveBuffers.

`Handler`

Expand Down
151 changes: 151 additions & 0 deletions src/core/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,15 @@
goto Error;
}

if (!!(Flags & QUIC_STREAM_OPEN_FLAG_EXTERNAL_BUFFERS) &&
Connection->Settings.StreamMultiReceiveEnabled) {
Comment on lines +661 to +662
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if they want to use multi-receive for streams that don't use external buffers? I think we can't block that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would allow an "app buffer" stream when multi-receive is enabled?
The problem is that this stream would not support the multi-receive behavior (at least for now, we could make it happen), which could be confusing for the app.

And if app takes dependency on the fact that "app buffer" stream ignore multi-receive, then we can't add support without an API break or new flags.

//
// External buffers are not supported with multi-receive.
//
Status = QUIC_STATUS_INVALID_PARAMETER;
goto Error;

Check warning on line 667 in src/core/api.c

View check run for this annotation

Codecov / codecov/patch

src/core/api.c#L666-L667

Added lines #L666 - L667 were not covered by tests
}

Status = QuicStreamInitialize(Connection, FALSE, Flags, (QUIC_STREAM**)NewStream);
if (QUIC_FAILED(Status)) {
goto Error;
Expand Down Expand Up @@ -1351,6 +1360,148 @@
"[ api] Exit");
}

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
QUIC_API
MsQuicStreamProvideReceiveBuffers(
_In_ _Pre_defensive_ HQUIC Handle,
_In_ uint32_t BufferCount,
_In_reads_(BufferCount) const QUIC_BUFFER* Buffers
)
{
QUIC_STATUS Status;
QUIC_OPERATION* Oper;
CXPLAT_LIST_ENTRY ChunkList;
CxPlatListInitializeHead(&ChunkList);

QuicTraceEvent(
ApiEnter,
"[ api] Enter %u (%p).",
QUIC_TRACE_API_STREAM_PROVIDE_RECEIVE_BUFFERS,
Handle);

if (!IS_STREAM_HANDLE(Handle) || Buffers == NULL || BufferCount == 0) {
Status = QUIC_STATUS_INVALID_PARAMETER;
goto Error;

Check warning on line 1385 in src/core/api.c

View check run for this annotation

Codecov / codecov/patch

src/core/api.c#L1384-L1385

Added lines #L1384 - L1385 were not covered by tests
}

for (uint32_t i = 0; i < BufferCount; ++i) {
if (Buffers[i].Length == 0) {
Status = QUIC_STATUS_INVALID_PARAMETER;
goto Error;

Check warning on line 1391 in src/core/api.c

View check run for this annotation

Codecov / codecov/patch

src/core/api.c#L1390-L1391

Added lines #L1390 - L1391 were not covered by tests
}
}

#pragma prefast(suppress: __WARNING_25024, "Pointer cast already validated.")
QUIC_STREAM* Stream = (QUIC_STREAM*)Handle;

CXPLAT_TEL_ASSERT(!Stream->Flags.HandleClosed);
CXPLAT_TEL_ASSERT(!Stream->Flags.Freed);

QUIC_CONNECTION* Connection = Stream->Connection;
QUIC_CONN_VERIFY(Connection, !Connection->State.Freed);

//
// Execute this API call inline if called on the worker thread.
//
BOOLEAN IsWorkerThread = Connection->WorkerThreadID == CxPlatCurThreadID();
BOOLEAN IsAlreadyInline = Connection->State.InlineApiExecution;

if (!Stream->Flags.UseExternalRecvBuffers) {
if (Stream->Flags.PeerStreamStartEventActive) {
CXPLAT_DBG_ASSERT(IsWorkerThread);
//
// We are inline from the callback indicating a peer opened a stream.
// No data was received yet so we can setup external buffers.
//
Connection->State.InlineApiExecution = TRUE;
QuicStreamSwitchToExternalBuffers(Stream);
Connection->State.InlineApiExecution = IsAlreadyInline;
} else {
//
// External buffers can't be provided after the stream has been
// started using internal buffers.
//
Status = QUIC_STATUS_INVALID_STATE;
goto Error;

Check warning on line 1426 in src/core/api.c

View check run for this annotation

Codecov / codecov/patch

src/core/api.c#L1425-L1426

Added lines #L1425 - L1426 were not covered by tests
}
}

//
// Allocate a chunk for each buffer, linking them together.
// The allocation is done here to make the worker thread task failure free.
//
for (uint32_t i = 0; i < BufferCount; ++i) {
QUIC_RECV_CHUNK* Chunk = CXPLAT_ALLOC_NONPAGED(sizeof(QUIC_RECV_CHUNK), QUIC_POOL_RECVBUF);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use a pool allocator (to be stored in the worker, like the others) since this is (a) fixed size and (b) on the datapath.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means each chunk will need a flag to track whether they are allocated from the pool or not then. The size of the chunk seems to be rather optimized (AllocLength restricted to 31bits to avoid a few extra bytes), is it ok to simply add a "flag" byte?

Which makes me realize I need to check the size of the app provided buffers won't override that bit.

if (Chunk == NULL) {
QuicTraceEvent(

Check warning on line 1437 in src/core/api.c

View check run for this annotation

Codecov / codecov/patch

src/core/api.c#L1437

Added line #L1437 was not covered by tests
AllocFailure,
"Allocation of '%s' failed. (%llu bytes)",
"provided_chunk",
0);
Status = QUIC_STATUS_OUT_OF_MEMORY;
goto Error;

Check warning on line 1443 in src/core/api.c

View check run for this annotation

Codecov / codecov/patch

src/core/api.c#L1442-L1443

Added lines #L1442 - L1443 were not covered by tests
}
QuicRecvChunkInitialize(Chunk, Buffers[i].Length, Buffers[i].Buffer);
CxPlatListInsertTail(&ChunkList, &Chunk->Link);
}

if (IsWorkerThread) {
//
// Execute this API call inline if called on the worker thread.
//
Connection->State.InlineApiExecution = TRUE;
Status = QuicStreamProvideRecvBuffers(Stream, &ChunkList);
Connection->State.InlineApiExecution = IsAlreadyInline;
} else {
//
// Queue the operation to insert the chunks in the recv buffer, without waiting for the result.
//
Oper = QuicOperationAlloc(Connection->Worker, QUIC_OPER_TYPE_API_CALL);
if (Oper == NULL) {
Status = QUIC_STATUS_OUT_OF_MEMORY;
QuicTraceEvent(

Check warning on line 1463 in src/core/api.c

View check run for this annotation

Codecov / codecov/patch

src/core/api.c#L1462-L1463

Added lines #L1462 - L1463 were not covered by tests
AllocFailure,
"Allocation of '%s' failed. (%llu bytes)",
"STRM_PROVIDE_RECV_BUFFERS, operation",
0);
goto Error;

Check warning on line 1468 in src/core/api.c

View check run for this annotation

Codecov / codecov/patch

src/core/api.c#L1468

Added line #L1468 was not covered by tests
}
Oper->API_CALL.Context->Type = QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS;
Oper->API_CALL.Context->STRM_PROVIDE_RECV_BUFFERS.Stream = Stream;
CxPlatListInitializeHead(&Oper->API_CALL.Context->STRM_PROVIDE_RECV_BUFFERS.Chunks);
CxPlatListMoveItems(&ChunkList, &Oper->API_CALL.Context->STRM_PROVIDE_RECV_BUFFERS.Chunks);

//
// Async stream operations need to hold a ref on the stream so that the
// stream isn't freed before the operation can be processed. The ref is
// released after the operation is processed.
//
QuicStreamAddRef(Stream, QUIC_STREAM_REF_OPERATION);

//
// Queue the operation but don't wait for the completion.
//
QuicConnQueueOper(Connection, Oper);
}

Status = QUIC_STATUS_SUCCESS;

Error:
// Cleanup allocated chunks if the operation failed.
guhetier marked this conversation as resolved.
Show resolved Hide resolved
while (!CxPlatListIsEmpty(&ChunkList)) {
QUIC_RECV_CHUNK* Chunk = CXPLAT_CONTAINING_RECORD(CxPlatListRemoveHead(&ChunkList), QUIC_RECV_CHUNK, Link);
CXPLAT_FREE(Chunk, QUIC_POOL_RECVBUF);
guhetier marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 1495 in src/core/api.c

View check run for this annotation

Codecov / codecov/patch

src/core/api.c#L1493-L1495

Added lines #L1493 - L1495 were not covered by tests

QuicTraceEvent(
ApiExitStatus,
"[ api] Exit %u",
Status);

return Status;
}

_IRQL_requires_max_(PASSIVE_LEVEL)
QUIC_STATUS
QUIC_API
Expand Down
9 changes: 9 additions & 0 deletions src/core/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,15 @@ MsQuicStreamReceiveComplete(
_In_ uint64_t BufferLength
);

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
QUIC_API
MsQuicStreamProvideReceiveBuffers(
_In_ _Pre_defensive_ HQUIC Handle,
_In_ uint32_t BufferCount,
_In_reads_(BufferCount) const QUIC_BUFFER *Buffers
);

_IRQL_requires_max_(DISPATCH_LEVEL)
QUIC_STATUS
QUIC_API
Expand Down
19 changes: 19 additions & 0 deletions src/core/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -7508,6 +7508,25 @@
ApiCtx->STRM_RECV_SET_ENABLED.IsEnabled);
break;

case QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS:
Status =
QuicStreamProvideRecvBuffers(
ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Stream,
&ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Chunks);

if (Status != QUIC_STATUS_SUCCESS) {
//
// If we cannot accept the app provided buffers at this point, we need to abort
// the connection: otherwise, we break the contract with the app about writting
// data to the provided buffers in order.
//
QuicConnFatalError(

Check warning on line 7523 in src/core/connection.c

View check run for this annotation

Codecov / codecov/patch

src/core/connection.c#L7523

Added line #L7523 was not covered by tests
ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Stream->Connection,
Status,
"Failed to accept app provided receive buffers");
}
break;

case QUIC_API_TYPE_SET_PARAM:
Status =
QuicLibrarySetParam(
Expand Down
1 change: 1 addition & 0 deletions src/core/library.c
Original file line number Diff line number Diff line change
Expand Up @@ -1798,6 +1798,7 @@ MsQuicOpenVersion(
Api->StreamSend = MsQuicStreamSend;
Api->StreamReceiveComplete = MsQuicStreamReceiveComplete;
Api->StreamReceiveSetEnabled = MsQuicStreamReceiveSetEnabled;
Api->StreamProvideReceiveBuffers = MsQuicStreamProvideReceiveBuffers;

Api->DatagramSend = MsQuicDatagramSend;

Expand Down
2 changes: 2 additions & 0 deletions src/core/operation.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ QuicOperationFree(
}
} else if (ApiCtx->Type == QUIC_API_TYPE_STRM_RECV_SET_ENABLED) {
QuicStreamRelease(ApiCtx->STRM_RECV_SET_ENABLED.Stream, QUIC_STREAM_REF_OPERATION);
} else if (ApiCtx->Type == QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS) {
QuicStreamRelease(ApiCtx->STRM_PROVIDE_RECV_BUFFERS.Stream, QUIC_STREAM_REF_OPERATION);
guhetier marked this conversation as resolved.
Show resolved Hide resolved
}
CxPlatPoolFree(&Worker->ApiContextPool, ApiCtx);
} else if (Oper->Type == QUIC_OPER_TYPE_FLUSH_STREAM_RECV) {
Expand Down
5 changes: 5 additions & 0 deletions src/core/operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ typedef enum QUIC_API_TYPE {
QUIC_API_TYPE_STRM_SEND,
QUIC_API_TYPE_STRM_RECV_COMPLETE,
QUIC_API_TYPE_STRM_RECV_SET_ENABLED,
QUIC_API_TYPE_STRM_PROVIDE_RECV_BUFFERS,

QUIC_API_TYPE_SET_PARAM,
QUIC_API_TYPE_GET_PARAM,
Expand Down Expand Up @@ -154,6 +155,10 @@ typedef struct QUIC_API_CONTEXT {
QUIC_STREAM* Stream;
BOOLEAN IsEnabled;
} STRM_RECV_SET_ENABLED;
struct {
QUIC_STREAM* Stream;
CXPLAT_LIST_ENTRY /* QUIC_RECV_CHUNK */ Chunks;
} STRM_PROVIDE_RECV_BUFFERS;

struct {
HQUIC Handle;
Expand Down
Loading
Loading