Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions URocket/Connection/Connection.Write.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ public unsafe partial class Connection : IBufferWriter<byte>, IDisposable
private readonly UnmanagedMemoryManager _manager;

/// <summary>Pointer to the unmanaged write buffer.</summary>
public byte* WriteBuffer { get; private set; }
public byte* WriteBuffer { get; }

/// <summary>Logical start of valid data (currently unused; reserved for future streaming).</summary>
public int WriteHead { get; set; }

/// <summary>Logical end of written data in <see cref="WriteBuffer"/>.</summary>
public int WriteTail { get; set; }
public int WriteTail { get; private set; }

/// <summary> Number of bytes that were sent to kernel, we need this value to validate whether all flushed data was processed by the kernel. </summary>
internal int WriteInFlight { get; set; }

/// <summary>Indicates that the buffer may be flushed by the reactor.</summary>
public volatile bool CanFlush = true;
Expand All @@ -59,7 +62,7 @@ public Connection(int writeSlabSize = 1024 * 16) {
/// Copies unmanaged memory into the write slab.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Write(byte* ptr, int length)
internal void Write(byte* ptr, int length)
{
if ((uint)length > (uint)_writeSlabSize) // also rejects negative
throw new ArgumentOutOfRangeException(nameof(length));
Expand Down
10 changes: 8 additions & 2 deletions URocket/Engine/Engine.Reactor.HandleSubmitAndWaitSingleCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,14 @@ internal void HandleSubmitAndWaitSingleCall()
if (connection.WriteHead < connection.WriteTail)
{
Console.WriteLine("Oddness");
connection.CanFlush = false;
SubmitSend(io_uring_instance, connection.ClientFd, connection.WriteBuffer, (uint)connection.WriteHead, (uint)connection.WriteTail);
//connection.CanFlush = false; This is unnecessary?
SubmitSend(
io_uring_instance,
connection.ClientFd,
connection.WriteBuffer,
(uint)connection.WriteHead,
(uint)(connection.WriteTail - connection.WriteHead));

continue;
// queued SQE; flushed next loop
}
Expand Down
13 changes: 10 additions & 3 deletions URocket/Engine/Engine.Reactor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,16 @@ private void DrainWriteQ()
if (_engine.Connections[Id].TryGetValue(fd, out var connection))
{
connection.CanWrite = false; // Reset write flag for each drained connection

if(connection.CanFlush)
Send(connection.ClientFd, connection.WriteBuffer, (uint)connection.WriteHead, (uint)connection.WriteTail);

if (connection.CanFlush)
{
Send(connection.ClientFd, connection.WriteBuffer, (uint)connection.WriteHead,
(uint)connection.WriteTail);

// Data is sent to kernel, until we don't have confirmation that
// this data was fully processed by the kernel we don't send more data to it
connection.CanFlush = false;
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions URocket/URocket.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
<Authors>Diogo Martins</Authors>
<RepositoryUrl>https://github.com/MDA2AV/uRocket</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<AssemblyVersion>10.4.0</AssemblyVersion>
<FileVersion>10.4.0</FileVersion>
<Version>10.4.0</Version>
<AssemblyVersion>10.4.1</AssemblyVersion>
<FileVersion>10.4.1</FileVersion>
<Version>10.4.1</Version>
<PackageReadmeFile>README.md</PackageReadmeFile>
<PackageTags>io_uring</PackageTags>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
Expand Down