Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using StreamChat.Core.LowLevelClient.Models;
using StreamChat.Libs.Logs;
using StreamChat.Libs.NetworkMonitors;
using StreamChat.Libs.Time;

Expand Down Expand Up @@ -39,12 +40,13 @@ private set
}

public ReconnectScheduler(ITimeService timeService, IStreamChatLowLevelClient lowLevelClient,
INetworkMonitor networkMonitor)
INetworkMonitor networkMonitor, ILogs logs)
{
_client = lowLevelClient ?? throw new ArgumentNullException(nameof(lowLevelClient));
_timeService = timeService ?? throw new ArgumentNullException(nameof(timeService));
_networkMonitor = networkMonitor ?? throw new ArgumentNullException(nameof(networkMonitor));

_logs = logs ?? throw new ArgumentNullException(nameof(logs));

_networkMonitor.NetworkAvailabilityChanged += OnNetworkAvailabilityChanged;

_client.Connected += OnConnected;
Expand Down Expand Up @@ -106,6 +108,7 @@ public void Stop()
private readonly IStreamChatLowLevelClient _client;
private readonly ITimeService _timeService;
private readonly INetworkMonitor _networkMonitor;
private readonly ILogs _logs;

private int _reconnectAttempts;
private bool _isStopped;
Expand Down Expand Up @@ -178,6 +181,9 @@ private void OnConnectionStateChanged(ConnectionState previous, ConnectionState

private void OnNetworkAvailabilityChanged(bool isNetworkAvailable)
{
#if STREAM_DEBUG_ENABLED
_logs.Warning($"Network availability changed to: {isNetworkAvailable}");
#endif
if (!isNetworkAvailable)
{
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ ChannelMemberRequestInternalDTO ISavableTo<ChannelMemberRequestInternalDTO>.Save
InviteRejectedAt = InviteRejectedAt,
Invited = Invited,
IsModerator = IsModerator,
#pragma warning disable CS0618
Role = Role,
#pragma warning restore CS0618
ShadowBanned = ShadowBanned,
UpdatedAt = UpdatedAt,
User = User.TrySaveToDto(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ private set

var previous = _connectionState;
_connectionState = value;

#if STREAM_DEBUG_ENABLED
_logs.Warning($"Connection state changed from: {previous} to: {value}");
#endif

ConnectionStateChanged?.Invoke(previous, _connectionState);

if (value == ConnectionState.Disconnected)
Expand Down Expand Up @@ -302,7 +307,7 @@ public StreamChatLowLevelClient(AuthCredentials authCredentials, IWebsocketClien
UserApi = new UserApi(InternalUserApi);
DeviceApi = new DeviceApi(InternalDeviceApi);

_reconnectScheduler = new ReconnectScheduler(_timeService, this, _networkMonitor);
_reconnectScheduler = new ReconnectScheduler(_timeService, this, _networkMonitor, _logs);
_reconnectScheduler.ReconnectionScheduled += OnReconnectionScheduled;

RegisterEventHandlers();
Expand Down Expand Up @@ -483,6 +488,7 @@ internal async Task<OwnUserInternalDTO> ConnectUserAsync(string apiKey, string u

var connectionUri = _requestUriFactory.CreateConnectionUri();

//StreamTodo: pass the cancellation token here cancellationToken
await _websocketClient.ConnectAsync(connectionUri);

var ownUserDto = await _connectUserTaskSource.Task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface IWebsocketClient : IDisposable

bool TryDequeueMessage(out string message);

Task ConnectAsync(Uri serverUri);
Task ConnectAsync(Uri serverUri, int timeout = 3);

void Update();

Expand Down
83 changes: 76 additions & 7 deletions Assets/Plugins/StreamChat/Libs/Websockets/WebsocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
using System.Threading;
using System.Threading.Tasks;
using StreamChat.Libs.Logs;
#if STREAM_DEBUG_ENABLED
using System.Diagnostics;
#endif

namespace StreamChat.Libs.Websockets
{
Expand Down Expand Up @@ -35,7 +38,7 @@ public WebsocketClient(ILogs logs, Encoding encoding = default, bool isDebugMode

public bool TryDequeueMessage(out string message) => _receiveQueue.TryDequeue(out message);

public async Task ConnectAsync(Uri serverUri)
public async Task ConnectAsync(Uri serverUri, int timeout = 3)
{
if (IsConnected || IsConnecting)
{
Expand All @@ -52,11 +55,48 @@ await TryDisposeResourcesAsync(WebSocketCloseStatus.NormalClosure,
_connectionCts = new CancellationTokenSource();

_internalClient = new ClientWebSocket();
await _internalClient.ConnectAsync(_uri, _connectionCts.Token).ConfigureAwait(false);

#if STREAM_DEBUG_ENABLED
var ws = new Stopwatch();
ws.Start();
_logs.Warning($"Internal WS ConnectAsync CALL");
#endif

var connectTask = _internalClient.ConnectAsync(_uri, _connectionCts.Token);
var timeoutTask = Task.Delay(TimeSpan.FromSeconds(timeout));

// We handle timeout this way because ConnectAsync was hanging after multiple attempts on Unity 2022.3.29 & Android 14 and cancellation via passed token didn't work
var finishedTask = await Task.WhenAny(connectTask, timeoutTask);

if (finishedTask == timeoutTask)
{
#if STREAM_DEBUG_ENABLED
_logs.Warning("Internal WS Connection attempt timed out.");
#endif
throw new TimeoutException($"Connection attempt timed out after {timeout} seconds.");
}

if (_connectionCts == null || _connectionCts.Token.IsCancellationRequested)
{
#if STREAM_DEBUG_ENABLED
_logs.Warning("Internal WS Connection attempt cancelled.");
#endif
throw new OperationCanceledException();
}

await connectTask;

#if STREAM_DEBUG_ENABLED
ws.Stop();
_logs.Warning($"Internal WS ConnectAsync COMPLETED in {ws.ElapsedMilliseconds} ms.");
#endif

}
catch (OperationCanceledException e)
{
LogExceptionIfDebugMode(e);
OnConnectionFailed();
return;
}
catch (WebSocketException e)
{
Expand Down Expand Up @@ -93,6 +133,15 @@ public void Send(string message)

public void Update()
{
#if STREAM_DEBUG_ENABLED

if(_internalClient != null && _internalClient.State != _lastState)
{
_logs.Warning($"Internal WS state -> changed from {_lastState} to " + _internalClient.State);
_lastState = _internalClient.State;
}
#endif

var disconnect = false;
while (_threadWebsocketExceptionsLog.TryDequeue(out var webSocketException))
{
Expand Down Expand Up @@ -124,9 +173,13 @@ public async Task DisconnectAsync(WebSocketCloseStatus closeStatus, string close

public void Dispose()
{
LogInfoIfDebugMode("Dispose");
DisconnectAsync(WebSocketCloseStatus.NormalClosure, "WebSocket client is disposed")
.ContinueWith(_ => LogExceptionIfDebugMode(_.Exception), TaskContinuationOptions.OnlyOnFaulted);
LogInfoIfDebugMode("Dispose " + Thread.CurrentThread.ManagedThreadId);

if(_internalClient != null && !_clientClosedStates.Contains(_internalClient.State))
{
DisconnectAsync(WebSocketCloseStatus.NormalClosure, "WebSocket client is disposed")
.ContinueWith(t => LogExceptionIfDebugMode(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
}
}

private const int UpdatesPerSecond = 20;
Expand Down Expand Up @@ -163,6 +216,8 @@ public void Dispose()
private ClientWebSocket _internalClient;
private CancellationTokenSource _connectionCts;

private WebSocketState _lastState;

private async void SendMessagesCallback(object state)
{
if (!IsConnected || _connectionCts == null || _connectionCts.IsCancellationRequested)
Expand Down Expand Up @@ -250,6 +305,14 @@ private async Task TryDisposeResourcesAsync(WebSocketCloseStatus closeStatus, st
{
_backgroundReceiveTimer?.Dispose();
_backgroundReceiveTimer = null;
}
catch (Exception e)
{
LogExceptionIfDebugMode(e);
}

try
{
_backgroundSendTimer?.Dispose();
_backgroundSendTimer = null;
}
Expand Down Expand Up @@ -281,6 +344,9 @@ private async Task TryDisposeResourcesAsync(WebSocketCloseStatus closeStatus, st
{
if (!_clientClosedStates.Contains(_internalClient.State))
{
#if STREAM_DEBUG_ENABLED
_logs.Warning("Internal WS - Close in state: " + _internalClient.State);
#endif
await _internalClient.CloseOutputAsync(closeStatus, closeMessage, CancellationToken.None);
}
}
Expand All @@ -290,6 +356,9 @@ private async Task TryDisposeResourcesAsync(WebSocketCloseStatus closeStatus, st
}
finally
{
#if STREAM_DEBUG_ENABLED
_logs.Warning("Internal WS - Dispose in state: " + _internalClient.State);
#endif
_internalClient.Dispose();
_internalClient = null;
}
Expand All @@ -300,7 +369,7 @@ private async Task TryDisposeResourcesAsync(WebSocketCloseStatus closeStatus, st
// Called from a background thread
private void OnReceivedCloseMessage()
=> DisconnectAsync(WebSocketCloseStatus.InternalServerError, "Server closed the connection")
.ContinueWith(_ => LogThreadExceptionIfDebugMode(_.Exception), TaskContinuationOptions.OnlyOnFaulted);
.ContinueWith(t => LogThreadExceptionIfDebugMode(t.Exception), TaskContinuationOptions.OnlyOnFaulted);

private async Task<string> TryReceiveSingleMessageAsync()
{
Expand Down Expand Up @@ -344,7 +413,7 @@ private async Task<string> TryReceiveSingleMessageAsync()
throw new Exception("Unhandled WebSocket message type: " + WebSocketMessageType.Binary);
}

return "";
return string.Empty;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ protected void Awake()
}
});

_missingCredentials = true;

#if UNITY_EDITOR

StartCoroutine(BlinkProjectAsset(_authCredentialsAsset, popup));
Expand All @@ -81,7 +79,6 @@ protected void Awake()
}

private IStreamChatClient _client;
private bool _missingCredentials;

[SerializeField]
private RootView _rootView;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using StreamChat.Core.Exceptions;
using StreamChat.Core.LowLevelClient.Models;
using StreamChat.Core.LowLevelClient.Requests;
using UnityEngine;
using UnityEngine.TestTools;

namespace StreamChat.Tests.LowLevelClient.Integration
Expand Down Expand Up @@ -59,8 +60,14 @@ public IEnumerator Create_channel_with_custom_data()
ChannelState channelState = null;
yield return CreateTempUniqueChannel("messaging", requestBody, state => channelState = state);

Assert.AreEqual(3, channelState.Channel.AdditionalProperties.Count);
Assert.AreEqual(3, channelState.Channel.AdditionalProperties.Count);
Assert.IsTrue(channelState.Channel.AdditionalProperties.ContainsKey("MyNumber"));
Assert.AreEqual(3, channelState.Channel.AdditionalProperties["MyNumber"]);
Assert.IsTrue(channelState.Channel.AdditionalProperties.ContainsKey("MyString"));
Assert.AreEqual("Hey Joe!", channelState.Channel.AdditionalProperties["MyString"]);
Assert.IsTrue(channelState.Channel.AdditionalProperties.ContainsKey("MyIntArray"));

//StreamTodo: fix returned array here to not be JArray https://stream-io.atlassian.net/browse/PBE-4851
//Assert.AreEqual(new int[] { 5, 8, 9 }, (int[])channelState.Channel.AdditionalProperties["MyIntArray"]);
}

[UnityTest]
Expand Down
Loading