diff --git a/URocket/Connection/Connection.Write.cs b/URocket/Connection/Connection.Write.cs index ac3b84d..4a8d3b1 100644 --- a/URocket/Connection/Connection.Write.cs +++ b/URocket/Connection/Connection.Write.cs @@ -30,13 +30,16 @@ public unsafe partial class Connection : IBufferWriter, IDisposable private readonly UnmanagedMemoryManager _manager; /// Pointer to the unmanaged write buffer. - public byte* WriteBuffer { get; private set; } + public byte* WriteBuffer { get; } /// Logical start of valid data (currently unused; reserved for future streaming). public int WriteHead { get; set; } /// Logical end of written data in . - public int WriteTail { get; set; } + public int WriteTail { get; private set; } + + /// Number of bytes that were sent to kernel, we need this value to validate whether all flushed data was processed by the kernel. + internal int WriteInFlight { get; set; } /// Indicates that the buffer may be flushed by the reactor. public volatile bool CanFlush = true; @@ -59,7 +62,7 @@ public Connection(int writeSlabSize = 1024 * 16) { /// Copies unmanaged memory into the write slab. /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public void Write(byte* ptr, int length) + internal void Write(byte* ptr, int length) { if ((uint)length > (uint)_writeSlabSize) // also rejects negative throw new ArgumentOutOfRangeException(nameof(length)); diff --git a/URocket/Engine/Engine.Reactor.HandleSubmitAndWaitSingleCall.cs b/URocket/Engine/Engine.Reactor.HandleSubmitAndWaitSingleCall.cs index 98823bc..745bf92 100644 --- a/URocket/Engine/Engine.Reactor.HandleSubmitAndWaitSingleCall.cs +++ b/URocket/Engine/Engine.Reactor.HandleSubmitAndWaitSingleCall.cs @@ -143,8 +143,14 @@ internal void HandleSubmitAndWaitSingleCall() if (connection.WriteHead < connection.WriteTail) { Console.WriteLine("Oddness"); - connection.CanFlush = false; - SubmitSend(io_uring_instance, connection.ClientFd, connection.WriteBuffer, (uint)connection.WriteHead, (uint)connection.WriteTail); + //connection.CanFlush = false; This is unnecessary? + SubmitSend( + io_uring_instance, + connection.ClientFd, + connection.WriteBuffer, + (uint)connection.WriteHead, + (uint)(connection.WriteTail - connection.WriteHead)); + continue; // queued SQE; flushed next loop } diff --git a/URocket/Engine/Engine.Reactor.cs b/URocket/Engine/Engine.Reactor.cs index ba90c72..8c3cf4b 100644 --- a/URocket/Engine/Engine.Reactor.cs +++ b/URocket/Engine/Engine.Reactor.cs @@ -225,9 +225,16 @@ private void DrainWriteQ() if (_engine.Connections[Id].TryGetValue(fd, out var connection)) { connection.CanWrite = false; // Reset write flag for each drained connection - - if(connection.CanFlush) - Send(connection.ClientFd, connection.WriteBuffer, (uint)connection.WriteHead, (uint)connection.WriteTail); + + if (connection.CanFlush) + { + Send(connection.ClientFd, connection.WriteBuffer, (uint)connection.WriteHead, + (uint)connection.WriteTail); + + // Data is sent to kernel, until we don't have confirmation that + // this data was fully processed by the kernel we don't send more data to it + connection.CanFlush = false; + } } } } diff --git a/URocket/URocket.csproj b/URocket/URocket.csproj index db3ace5..f961303 100644 --- a/URocket/URocket.csproj +++ b/URocket/URocket.csproj @@ -13,9 +13,9 @@ Diogo Martins https://github.com/MDA2AV/uRocket git - 10.4.0 - 10.4.0 - 10.4.0 + 10.4.1 + 10.4.1 + 10.4.1 README.md io_uring LICENSE