Skip to content

Commit

Permalink
Re: #47 - Return a Task from Track and Untrack methods
Browse files Browse the repository at this point in the history
  • Loading branch information
acupofjose committed May 22, 2024
1 parent b48e4b7 commit 598643b
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 19 deletions.
10 changes: 9 additions & 1 deletion Realtime/Interfaces/IRealtimePresence.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Supabase.Realtime.Socket;
using System;
using System.Threading.Tasks;
using Supabase.Realtime.Models;
using static Supabase.Realtime.Constants;

Expand Down Expand Up @@ -39,7 +40,14 @@ public enum EventType
/// </summary>
/// <param name="payload"></param>
/// <param name="timeoutMs"></param>
void Track(object? payload, int timeoutMs = DefaultTimeout);
Task Track(object? payload, int timeoutMs = DefaultTimeout);

/// <summary>
/// Untracks a client
/// </summary>
/// <param name="payload"></param>

Check warning on line 48 in Realtime/Interfaces/IRealtimePresence.cs

View workflow job for this annotation

GitHub Actions / build-and-deploy

XML comment has a param tag for 'payload', but there is no parameter by that name

Check warning on line 48 in Realtime/Interfaces/IRealtimePresence.cs

View workflow job for this annotation

GitHub Actions / build-and-test

XML comment has a param tag for 'payload', but there is no parameter by that name

Check warning on line 48 in Realtime/Interfaces/IRealtimePresence.cs

View workflow job for this annotation

GitHub Actions / build-and-test

XML comment has a param tag for 'payload', but there is no parameter by that name
/// <param name="timeoutMs"></param>

Check warning on line 49 in Realtime/Interfaces/IRealtimePresence.cs

View workflow job for this annotation

GitHub Actions / build-and-deploy

XML comment has a param tag for 'timeoutMs', but there is no parameter by that name

Check warning on line 49 in Realtime/Interfaces/IRealtimePresence.cs

View workflow job for this annotation

GitHub Actions / build-and-test

XML comment has a param tag for 'timeoutMs', but there is no parameter by that name

Check warning on line 49 in Realtime/Interfaces/IRealtimePresence.cs

View workflow job for this annotation

GitHub Actions / build-and-test

XML comment has a param tag for 'timeoutMs', but there is no parameter by that name
Task Untrack();

/// <summary>
/// Add a presence event handler
Expand Down
18 changes: 9 additions & 9 deletions Realtime/RealtimeChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public class RealtimeChannel : IRealtimeChannel
/// </summary>
private readonly List<Push> _buffer = new();

private readonly IRealtimeSocket _socket;
internal readonly IRealtimeSocket Socket;
private IRealtimePresence? _presence;
private IRealtimeBroadcast? _broadcast;
private RealtimeException? _exception;
Expand All @@ -157,7 +157,7 @@ public class RealtimeChannel : IRealtimeChannel
private readonly Dictionary<ListenType, List<PostgresChangesHandler>> _postgresChangesHandlers =
new();

private bool CanPush => IsJoined && _socket.IsConnected;
private bool CanPush => IsJoined && Socket.IsConnected;
private bool _hasJoinedOnce;
private readonly Timer _rejoinTimer;
private bool _isRejoining;
Expand All @@ -171,8 +171,8 @@ public RealtimeChannel(IRealtimeSocket socket, string channelName, ChannelOption
Options = options;
Options.Parameters ??= new Dictionary<string, string>();

_socket = socket;
_socket.AddStateChangedHandler(HandleSocketStateChanged);
Socket = socket;
Socket.AddStateChangedHandler(HandleSocketStateChanged);

_rejoinTimer = new Timer(options.ClientOptions.Timeout.TotalMilliseconds);
_rejoinTimer.Elapsed += HandleRejoinTimerElapsed;
Expand Down Expand Up @@ -508,7 +508,7 @@ public IRealtimeChannel Unsubscribe()

NotifyStateChanged(ChannelState.Leaving);

var leavePush = new Push(_socket, this, ChannelEventLeave);
var leavePush = new Push(Socket, this, ChannelEventLeave);
leavePush.Send();

NotifyStateChanged(ChannelState.Closed, false);
Expand Down Expand Up @@ -536,7 +536,7 @@ public Push Push(string eventName, string? type = null, object? payload = null,
};
}

var push = new Push(_socket, this, eventName, type, payload, timeoutMs);
var push = new Push(Socket, this, eventName, type, payload, timeoutMs);
Enqueue(push);

return push;
Expand Down Expand Up @@ -582,7 +582,7 @@ public void Rejoin(int timeoutMs = DefaultTimeout)
/// Enqueues a message.
/// </summary>
/// <param name="push"></param>
private void Enqueue(Push push)
internal void Enqueue(Push push)
{
LastPush = push;

Expand All @@ -601,7 +601,7 @@ private void Enqueue(Push push)
/// Generates the Join Push message by merging broadcast, presence, and postgres_changes options.
/// </summary>
/// <returns></returns>
private Push GenerateJoinPush() => new(_socket, this, ChannelEventJoin,
private Push GenerateJoinPush() => new(Socket, this, ChannelEventJoin,
payload: new JoinPush(BroadcastOptions, PresenceOptions, PostgresChangesOptions));

/// <summary>
Expand All @@ -614,7 +614,7 @@ private void Enqueue(Push push)

if (!string.IsNullOrEmpty(accessToken))
{
return new Push(_socket, this, ChannelAccessToken, payload: new Dictionary<string, string>
return new Push(Socket, this, ChannelAccessToken, payload: new Dictionary<string, string>
{
{ "access_token", accessToken! }
});
Expand Down
50 changes: 45 additions & 5 deletions Realtime/RealtimePresence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
using Supabase.Realtime.Socket;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Supabase.Realtime.Channel;
using Supabase.Realtime.Exceptions;
using static Supabase.Realtime.Constants;

namespace Supabase.Realtime;
Expand Down Expand Up @@ -100,7 +103,7 @@ public void ClearPresenceEventHandlers(IRealtimePresence.EventType? eventType =
private void NotifyPresenceEventHandlers(IRealtimePresence.EventType eventType)
{
if (!_presenceEventListeners.ContainsKey(eventType)) return;

foreach (var handler in _presenceEventListeners[eventType].ToArray())
handler.Invoke(this, eventType);
}
Expand Down Expand Up @@ -149,20 +152,57 @@ public void TriggerDiff(SocketResponse response)
/// </summary>
/// <param name="payload"></param>
/// <param name="timeoutMs"></param>
public void Track(object? payload, int timeoutMs = DefaultTimeout)
public Task Track(object? payload, int timeoutMs = DefaultTimeout)
{
var eventName = Core.Helpers.GetMappedToAttr(ChannelEventName.Presence).Mapping;
_channel.Push(eventName, "track",
var push = new Push(_channel.Socket, _channel, eventName, "track",
new Dictionary<string, object?> { { "event", "track" }, { "payload", payload } }, timeoutMs);

var tcs = new TaskCompletionSource<Push>();

void Handler(IRealtimePush<RealtimeChannel, SocketResponse> chanel, SocketResponse response)
{
tcs.TrySetResult(push);
}

push.AddMessageReceivedHandler(Handler);

push.OnTimeout += (sender, args) =>
{
tcs.SetException(new RealtimeException(args.ToString()) { Reason = FailureHint.Reason.PushTimeout });
};

_channel.Enqueue(push);

return tcs.Task;
}

/// <summary>
/// Untracks an event.
/// </summary>
public void Untrack()
public Task Untrack()
{
var eventName = Core.Helpers.GetMappedToAttr(ChannelEventName.Presence).Mapping;
_channel.Push(eventName, "untrack");
var push = new Push(_channel.Socket, _channel, eventName, "untrack",
new Dictionary<string, object?> { { "event", "untrack" } });

var tcs = new TaskCompletionSource<Push>();

void Handler(IRealtimePush<RealtimeChannel, SocketResponse> chanel, SocketResponse response)
{
tcs.TrySetResult(push);
}

push.AddMessageReceivedHandler(Handler);

push.OnTimeout += (sender, args) =>
{
tcs.TrySetException(new RealtimeException((sender as Push)!.Ref)
{ Reason = FailureHint.Reason.PushTimeout });
};

_channel.Enqueue(push);
return tcs.Task;
}

/// <summary>
Expand Down
10 changes: 6 additions & 4 deletions RealtimeTests/ChannelPresenceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public async Task ClientCanCreatePresence()
{
var state = presence1.CurrentState;
if (state.ContainsKey(guid2) && state[guid2].First().Time != null)
tsc.SetResult(true);
tsc.TrySetResult(true);
});

var client2 = Helpers.SocketClient();
Expand All @@ -61,15 +61,17 @@ public async Task ClientCanCreatePresence()
{
var state = presence2.CurrentState;
if (state.ContainsKey(guid1) && state[guid1].First().Time != null)
tsc2.SetResult(true);
tsc2.TrySetResult(true);
});

await channel1.Subscribe();
await channel2.Subscribe();

presence1.Track(new PresenceExample { Time = DateTime.Now });
presence2.Track(new PresenceExample { Time = DateTime.Now });
await presence1.Track(new PresenceExample { Time = DateTime.Now });
await presence2.Track(new PresenceExample { Time = DateTime.Now });

await presence1.Untrack();

await Task.WhenAll(new[] { tsc.Task, tsc2.Task });
}
}

0 comments on commit 598643b

Please sign in to comment.