Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send Receive Streaming Helpers #39

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/NexNet/Invocation/IProxyInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,10 @@
/// <param name="pipe">Pipe to retrieve the Id of.</param>
/// <returns>Initial id of the pipe.</returns>
byte ProxyGetDuplexPipeInitialId(INexusDuplexPipe? pipe);

IRentedNexusDuplexPipe? GetDuplexPipe();

Check warning on line 78 in src/NexNet/Invocation/IProxyInvoker.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'IProxyInvoker.GetDuplexPipe()'

Check warning on line 78 in src/NexNet/Invocation/IProxyInvoker.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'IProxyInvoker.GetDuplexPipe()'

ValueTask WriteNexusEnumerableChannel<T>(

Check warning on line 80 in src/NexNet/Invocation/IProxyInvoker.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'IProxyInvoker.WriteNexusEnumerableChannel<T>(IRentedNexusDuplexPipe, NexusEnumerableChannel<T>)'
IRentedNexusDuplexPipe rentedNexusDuplexPipe,
NexusEnumerableChannel<T> pipe);
}
15 changes: 14 additions & 1 deletion src/NexNet/Invocation/ProxyInvocationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ internal CacheManager CacheManager

/// <inheritdoc />
INexusLogger? IProxyInvoker.Logger => _session?.Logger;

void IProxyInvoker.Configure(
INexusSession? session,
SessionManager? sessionManager,
Expand Down Expand Up @@ -325,6 +325,19 @@ async ValueTask<TReturn> IProxyInvoker.ProxyInvokeAndWaitForResultCore<TReturn>(
}
}

IRentedNexusDuplexPipe? IProxyInvoker.GetDuplexPipe()
{
return _session?.PipeManager.RentPipe();
}

ValueTask IProxyInvoker.WriteNexusEnumerableChannel<T>(
IRentedNexusDuplexPipe rentedNexusDuplexPipe,
NexusEnumerableChannel<T> enumeratorChannel)
{
enumeratorChannel.DuplexPipe = rentedNexusDuplexPipe;
return enumeratorChannel.WriteAndComplete();
}

/// <inheritdoc />
[MethodImpl(MethodImplOptions.AggressiveInlining)]
byte IProxyInvoker.ProxyGetDuplexPipeInitialId(INexusDuplexPipe? pipe)
Expand Down
118 changes: 118 additions & 0 deletions src/NexNet/Pipes/INexusDuplexChannel.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
using System;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Reflection.PortableExecutable;
using System.Threading;
using System.Threading.Tasks;

namespace NexNet.Pipes;
Expand Down Expand Up @@ -40,3 +44,117 @@
/// <returns>A ValueTask that represents the asynchronous operation. The task result contains the <see cref="INexusChannelReader{T}"/> instance.</returns>
ValueTask<INexusChannelReader<T>> GetReaderAsync();
}

/// <summary>
/// Represents a stream of objects that can be asynchronously enumerated.
/// </summary>
/// <typeparam name="T">The type of objects in the stream.</typeparam>
public class NexusEnumerableChannel<T> :IAsyncEnumerable<T>
{
private readonly IEnumerable<T>? _writingEnumerable;
internal IRentedNexusDuplexPipe DuplexPipe;

/// <summary>
/// Initializes a new instance of the NexusEnumerableStream class.
/// </summary>
public NexusEnumerableChannel(IEnumerable<T> writingEnumerable)

Check warning on line 60 in src/NexNet/Pipes/INexusDuplexChannel.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable field 'DuplexPipe' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.

Check warning on line 60 in src/NexNet/Pipes/INexusDuplexChannel.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable field 'DuplexPipe' must contain a non-null value when exiting constructor. Consider declaring the field as nullable.
{
_writingEnumerable = writingEnumerable;
}

internal NexusEnumerableChannel(IRentedNexusDuplexPipe duplexPipe)
{
_writingEnumerable = null;
DuplexPipe = duplexPipe;
}

internal async ValueTask WriteAndComplete()
{
var writer = await DuplexPipe.GetChannelWriter<T>();
await writer.WriteAndComplete(_writingEnumerable);

Check warning on line 74 in src/NexNet/Pipes/INexusDuplexChannel.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'enumerableData' in 'ValueTask NexusChannelExtensions.WriteAndComplete<T>(INexusChannelWriter<T> writer, IEnumerable<T> enumerableData, int chunkSize = 10, CancellationToken cancellationToken = default(CancellationToken))'.

Check warning on line 74 in src/NexNet/Pipes/INexusDuplexChannel.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'enumerableData' in 'ValueTask NexusChannelExtensions.WriteAndComplete<T>(INexusChannelWriter<T> writer, IEnumerable<T> enumerableData, int chunkSize = 10, CancellationToken cancellationToken = default(CancellationToken))'.
}

public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = new CancellationToken())

Check warning on line 77 in src/NexNet/Pipes/INexusDuplexChannel.cs

View workflow job for this annotation

GitHub Actions / build

Missing XML comment for publicly visible type or member 'NexusEnumerableChannel<T>.GetAsyncEnumerator(CancellationToken)'
{
return new Enumerator(DuplexPipe);
}

private class Enumerator : IAsyncEnumerator<T>
{
private NexusChannelReader<T>? _reader;
private readonly INexusDuplexPipe _duplexPipe;
private List<T>? _list = null;
private int _readIndex = 0;

/// <summary>
/// Creates an instance of the Enumerator class with the specified IRentedNexusDuplexPipe object.
/// </summary>
/// <param name="duplexPipe">The IRentedNexusDuplexPipe object to use for reading data.</param>
public Enumerator(IRentedNexusDuplexPipe duplexPipe)

Check warning on line 93 in src/NexNet/Pipes/INexusDuplexChannel.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable property 'Current' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 93 in src/NexNet/Pipes/INexusDuplexChannel.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable property 'Current' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.
{
_duplexPipe = duplexPipe;
_reader = new NexusChannelReader<T>(_duplexPipe.ReaderCore);
}

/// <summary>
/// Gets or sets the current value of the property.
/// </summary>
/// <value>The current value of the property.</value>
public T Current { get; set; }

public async ValueTask<bool> MoveNextAsync()
{
// If the reader is null, the channel has been read to completion.
if (_reader == null)
{
// If the read index is -1, then the pipe reading has completed.
if (_readIndex == -1)
return false;

if (_readIndex < _list!.Count)
{
Current = _list[_readIndex++];
return true;
}

// If we got here, we are at the end of the list and the channel has completed.
Current = default!;
_readIndex = -1;
_list.Clear();
_list.TrimExcess();
_list = null;

return false;
}

if (_list == null)
{
await _duplexPipe.ReadyTask;
_list = new List<T>();
}

if (_readIndex < _list.Count)
{
Current = _list[_readIndex++];
return true;
}

var result = await _reader.ReadAsync(_list, null);

// If false, then the reader has completed.
if (result == false)
{
_readIndex = -1;
return false;
}

_readIndex = 1;
Current = _list[_readIndex];
return true;
}
public ValueTask DisposeAsync()
{
return _duplexPipe.Input.CompleteAsync();
}
}
}
7 changes: 1 addition & 6 deletions src/NexNet/Pipes/RentedNexusDuplexPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ public RentedNexusDuplexPipe(byte localId, INexusSession session)

public ValueTask DisposeAsync()
{
var manager = Interlocked.Exchange(ref Manager, null);

if (manager == null)
return default;

return manager!.ReturnPipe(this);
return Interlocked.Exchange(ref Manager, null)?.ReturnPipe(this) ?? default;
}
}
Loading
Loading