diff --git a/Assets/Plugins/StreamChat/Core/LowLevelClient/ReconnectScheduler.cs b/Assets/Plugins/StreamChat/Core/LowLevelClient/ReconnectScheduler.cs index 9ad3d3b5..521d8814 100644 --- a/Assets/Plugins/StreamChat/Core/LowLevelClient/ReconnectScheduler.cs +++ b/Assets/Plugins/StreamChat/Core/LowLevelClient/ReconnectScheduler.cs @@ -1,5 +1,6 @@ using System; using StreamChat.Core.LowLevelClient.Models; +using StreamChat.Libs.Logs; using StreamChat.Libs.NetworkMonitors; using StreamChat.Libs.Time; @@ -39,12 +40,13 @@ private set } public ReconnectScheduler(ITimeService timeService, IStreamChatLowLevelClient lowLevelClient, - INetworkMonitor networkMonitor) + INetworkMonitor networkMonitor, ILogs logs) { _client = lowLevelClient ?? throw new ArgumentNullException(nameof(lowLevelClient)); _timeService = timeService ?? throw new ArgumentNullException(nameof(timeService)); _networkMonitor = networkMonitor ?? throw new ArgumentNullException(nameof(networkMonitor)); - + _logs = logs ?? throw new ArgumentNullException(nameof(logs)); + _networkMonitor.NetworkAvailabilityChanged += OnNetworkAvailabilityChanged; _client.Connected += OnConnected; @@ -106,6 +108,7 @@ public void Stop() private readonly IStreamChatLowLevelClient _client; private readonly ITimeService _timeService; private readonly INetworkMonitor _networkMonitor; + private readonly ILogs _logs; private int _reconnectAttempts; private bool _isStopped; @@ -178,6 +181,9 @@ private void OnConnectionStateChanged(ConnectionState previous, ConnectionState private void OnNetworkAvailabilityChanged(bool isNetworkAvailable) { +#if STREAM_DEBUG_ENABLED + _logs.Warning($"Network availability changed to: {isNetworkAvailable}"); +#endif if (!isNetworkAvailable) { return; diff --git a/Assets/Plugins/StreamChat/Core/LowLevelClient/Requests/ChannelMemberRequest.cs b/Assets/Plugins/StreamChat/Core/LowLevelClient/Requests/ChannelMemberRequest.cs index 0d5e665a..2b54cc21 100644 --- a/Assets/Plugins/StreamChat/Core/LowLevelClient/Requests/ChannelMemberRequest.cs +++ b/Assets/Plugins/StreamChat/Core/LowLevelClient/Requests/ChannelMemberRequest.cs @@ -81,7 +81,9 @@ ChannelMemberRequestInternalDTO ISavableTo.Save InviteRejectedAt = InviteRejectedAt, Invited = Invited, IsModerator = IsModerator, +#pragma warning disable CS0618 Role = Role, +#pragma warning restore CS0618 ShadowBanned = ShadowBanned, UpdatedAt = UpdatedAt, User = User.TrySaveToDto(), diff --git a/Assets/Plugins/StreamChat/Core/LowLevelClient/StreamChatLowLevelClient.cs b/Assets/Plugins/StreamChat/Core/LowLevelClient/StreamChatLowLevelClient.cs index a5422901..59e4c733 100644 --- a/Assets/Plugins/StreamChat/Core/LowLevelClient/StreamChatLowLevelClient.cs +++ b/Assets/Plugins/StreamChat/Core/LowLevelClient/StreamChatLowLevelClient.cs @@ -179,6 +179,11 @@ private set var previous = _connectionState; _connectionState = value; + +#if STREAM_DEBUG_ENABLED + _logs.Warning($"Connection state changed from: {previous} to: {value}"); +#endif + ConnectionStateChanged?.Invoke(previous, _connectionState); if (value == ConnectionState.Disconnected) @@ -302,7 +307,7 @@ public StreamChatLowLevelClient(AuthCredentials authCredentials, IWebsocketClien UserApi = new UserApi(InternalUserApi); DeviceApi = new DeviceApi(InternalDeviceApi); - _reconnectScheduler = new ReconnectScheduler(_timeService, this, _networkMonitor); + _reconnectScheduler = new ReconnectScheduler(_timeService, this, _networkMonitor, _logs); _reconnectScheduler.ReconnectionScheduled += OnReconnectionScheduled; RegisterEventHandlers(); @@ -483,6 +488,7 @@ internal async Task ConnectUserAsync(string apiKey, string u var connectionUri = _requestUriFactory.CreateConnectionUri(); + //StreamTodo: pass the cancellation token here cancellationToken await _websocketClient.ConnectAsync(connectionUri); var ownUserDto = await _connectUserTaskSource.Task; diff --git a/Assets/Plugins/StreamChat/Libs/Websockets/IWebsocketClient.cs b/Assets/Plugins/StreamChat/Libs/Websockets/IWebsocketClient.cs index e57d915a..8ee3130c 100644 --- a/Assets/Plugins/StreamChat/Libs/Websockets/IWebsocketClient.cs +++ b/Assets/Plugins/StreamChat/Libs/Websockets/IWebsocketClient.cs @@ -15,7 +15,7 @@ public interface IWebsocketClient : IDisposable bool TryDequeueMessage(out string message); - Task ConnectAsync(Uri serverUri); + Task ConnectAsync(Uri serverUri, int timeout = 3); void Update(); diff --git a/Assets/Plugins/StreamChat/Libs/Websockets/WebsocketClient.cs b/Assets/Plugins/StreamChat/Libs/Websockets/WebsocketClient.cs index 0433c4fc..fd37659c 100644 --- a/Assets/Plugins/StreamChat/Libs/Websockets/WebsocketClient.cs +++ b/Assets/Plugins/StreamChat/Libs/Websockets/WebsocketClient.cs @@ -8,6 +8,9 @@ using System.Threading; using System.Threading.Tasks; using StreamChat.Libs.Logs; +#if STREAM_DEBUG_ENABLED +using System.Diagnostics; +#endif namespace StreamChat.Libs.Websockets { @@ -35,7 +38,7 @@ public WebsocketClient(ILogs logs, Encoding encoding = default, bool isDebugMode public bool TryDequeueMessage(out string message) => _receiveQueue.TryDequeue(out message); - public async Task ConnectAsync(Uri serverUri) + public async Task ConnectAsync(Uri serverUri, int timeout = 3) { if (IsConnected || IsConnecting) { @@ -52,11 +55,48 @@ await TryDisposeResourcesAsync(WebSocketCloseStatus.NormalClosure, _connectionCts = new CancellationTokenSource(); _internalClient = new ClientWebSocket(); - await _internalClient.ConnectAsync(_uri, _connectionCts.Token).ConfigureAwait(false); + +#if STREAM_DEBUG_ENABLED + var ws = new Stopwatch(); + ws.Start(); + _logs.Warning($"Internal WS ConnectAsync CALL"); +#endif + + var connectTask = _internalClient.ConnectAsync(_uri, _connectionCts.Token); + var timeoutTask = Task.Delay(TimeSpan.FromSeconds(timeout)); + + // We handle timeout this way because ConnectAsync was hanging after multiple attempts on Unity 2022.3.29 & Android 14 and cancellation via passed token didn't work + var finishedTask = await Task.WhenAny(connectTask, timeoutTask); + + if (finishedTask == timeoutTask) + { +#if STREAM_DEBUG_ENABLED + _logs.Warning("Internal WS Connection attempt timed out."); +#endif + throw new TimeoutException($"Connection attempt timed out after {timeout} seconds."); + } + + if (_connectionCts == null || _connectionCts.Token.IsCancellationRequested) + { +#if STREAM_DEBUG_ENABLED + _logs.Warning("Internal WS Connection attempt cancelled."); +#endif + throw new OperationCanceledException(); + } + + await connectTask; + +#if STREAM_DEBUG_ENABLED + ws.Stop(); + _logs.Warning($"Internal WS ConnectAsync COMPLETED in {ws.ElapsedMilliseconds} ms."); +#endif + } catch (OperationCanceledException e) { LogExceptionIfDebugMode(e); + OnConnectionFailed(); + return; } catch (WebSocketException e) { @@ -93,6 +133,15 @@ public void Send(string message) public void Update() { +#if STREAM_DEBUG_ENABLED + + if(_internalClient != null && _internalClient.State != _lastState) + { + _logs.Warning($"Internal WS state -> changed from {_lastState} to " + _internalClient.State); + _lastState = _internalClient.State; + } +#endif + var disconnect = false; while (_threadWebsocketExceptionsLog.TryDequeue(out var webSocketException)) { @@ -124,9 +173,13 @@ public async Task DisconnectAsync(WebSocketCloseStatus closeStatus, string close public void Dispose() { - LogInfoIfDebugMode("Dispose"); - DisconnectAsync(WebSocketCloseStatus.NormalClosure, "WebSocket client is disposed") - .ContinueWith(_ => LogExceptionIfDebugMode(_.Exception), TaskContinuationOptions.OnlyOnFaulted); + LogInfoIfDebugMode("Dispose " + Thread.CurrentThread.ManagedThreadId); + + if(_internalClient != null && !_clientClosedStates.Contains(_internalClient.State)) + { + DisconnectAsync(WebSocketCloseStatus.NormalClosure, "WebSocket client is disposed") + .ContinueWith(t => LogExceptionIfDebugMode(t.Exception), TaskContinuationOptions.OnlyOnFaulted); + } } private const int UpdatesPerSecond = 20; @@ -163,6 +216,8 @@ public void Dispose() private ClientWebSocket _internalClient; private CancellationTokenSource _connectionCts; + private WebSocketState _lastState; + private async void SendMessagesCallback(object state) { if (!IsConnected || _connectionCts == null || _connectionCts.IsCancellationRequested) @@ -250,6 +305,14 @@ private async Task TryDisposeResourcesAsync(WebSocketCloseStatus closeStatus, st { _backgroundReceiveTimer?.Dispose(); _backgroundReceiveTimer = null; + } + catch (Exception e) + { + LogExceptionIfDebugMode(e); + } + + try + { _backgroundSendTimer?.Dispose(); _backgroundSendTimer = null; } @@ -281,6 +344,9 @@ private async Task TryDisposeResourcesAsync(WebSocketCloseStatus closeStatus, st { if (!_clientClosedStates.Contains(_internalClient.State)) { +#if STREAM_DEBUG_ENABLED + _logs.Warning("Internal WS - Close in state: " + _internalClient.State); +#endif await _internalClient.CloseOutputAsync(closeStatus, closeMessage, CancellationToken.None); } } @@ -290,6 +356,9 @@ private async Task TryDisposeResourcesAsync(WebSocketCloseStatus closeStatus, st } finally { +#if STREAM_DEBUG_ENABLED + _logs.Warning("Internal WS - Dispose in state: " + _internalClient.State); +#endif _internalClient.Dispose(); _internalClient = null; } @@ -300,7 +369,7 @@ private async Task TryDisposeResourcesAsync(WebSocketCloseStatus closeStatus, st // Called from a background thread private void OnReceivedCloseMessage() => DisconnectAsync(WebSocketCloseStatus.InternalServerError, "Server closed the connection") - .ContinueWith(_ => LogThreadExceptionIfDebugMode(_.Exception), TaskContinuationOptions.OnlyOnFaulted); + .ContinueWith(t => LogThreadExceptionIfDebugMode(t.Exception), TaskContinuationOptions.OnlyOnFaulted); private async Task TryReceiveSingleMessageAsync() { @@ -344,7 +413,7 @@ private async Task TryReceiveSingleMessageAsync() throw new Exception("Unhandled WebSocket message type: " + WebSocketMessageType.Binary); } - return ""; + return string.Empty; } } diff --git a/Assets/Plugins/StreamChat/SampleProject/Scripts/StreamChatClientBehaviour.cs b/Assets/Plugins/StreamChat/SampleProject/Scripts/StreamChatClientBehaviour.cs index 462cf905..66b15c19 100644 --- a/Assets/Plugins/StreamChat/SampleProject/Scripts/StreamChatClientBehaviour.cs +++ b/Assets/Plugins/StreamChat/SampleProject/Scripts/StreamChatClientBehaviour.cs @@ -66,8 +66,6 @@ protected void Awake() } }); - _missingCredentials = true; - #if UNITY_EDITOR StartCoroutine(BlinkProjectAsset(_authCredentialsAsset, popup)); @@ -81,7 +79,6 @@ protected void Awake() } private IStreamChatClient _client; - private bool _missingCredentials; [SerializeField] private RootView _rootView; diff --git a/Assets/Plugins/StreamChat/Tests/LowLevelClient/Integration/ChannelApiIntegrationTests.cs b/Assets/Plugins/StreamChat/Tests/LowLevelClient/Integration/ChannelApiIntegrationTests.cs index 00729415..6903ff63 100644 --- a/Assets/Plugins/StreamChat/Tests/LowLevelClient/Integration/ChannelApiIntegrationTests.cs +++ b/Assets/Plugins/StreamChat/Tests/LowLevelClient/Integration/ChannelApiIntegrationTests.cs @@ -8,6 +8,7 @@ using StreamChat.Core.Exceptions; using StreamChat.Core.LowLevelClient.Models; using StreamChat.Core.LowLevelClient.Requests; +using UnityEngine; using UnityEngine.TestTools; namespace StreamChat.Tests.LowLevelClient.Integration @@ -59,8 +60,14 @@ public IEnumerator Create_channel_with_custom_data() ChannelState channelState = null; yield return CreateTempUniqueChannel("messaging", requestBody, state => channelState = state); - Assert.AreEqual(3, channelState.Channel.AdditionalProperties.Count); - Assert.AreEqual(3, channelState.Channel.AdditionalProperties.Count); + Assert.IsTrue(channelState.Channel.AdditionalProperties.ContainsKey("MyNumber")); + Assert.AreEqual(3, channelState.Channel.AdditionalProperties["MyNumber"]); + Assert.IsTrue(channelState.Channel.AdditionalProperties.ContainsKey("MyString")); + Assert.AreEqual("Hey Joe!", channelState.Channel.AdditionalProperties["MyString"]); + Assert.IsTrue(channelState.Channel.AdditionalProperties.ContainsKey("MyIntArray")); + + //StreamTodo: fix returned array here to not be JArray https://stream-io.atlassian.net/browse/PBE-4851 + //Assert.AreEqual(new int[] { 5, 8, 9 }, (int[])channelState.Channel.AdditionalProperties["MyIntArray"]); } [UnityTest] diff --git a/Assets/Plugins/StreamChat/Tests/LowLevelClient/LowLevelClientConnectionTests.cs b/Assets/Plugins/StreamChat/Tests/LowLevelClient/LowLevelClientConnectionTests.cs new file mode 100644 index 00000000..fdb1c189 --- /dev/null +++ b/Assets/Plugins/StreamChat/Tests/LowLevelClient/LowLevelClientConnectionTests.cs @@ -0,0 +1,128 @@ +#if STREAM_TESTS_ENABLED +using System; +using System.Collections; +using System.Threading.Tasks; +using NUnit.Framework; +using StreamChat.Core; +using StreamChat.Core.Configs; +using StreamChat.Core.LowLevelClient; +using StreamChat.Core.LowLevelClient.Models; +using UnityEngine.TestTools; + +namespace StreamChat.Tests.LowLevelClient +{ + internal class LowLevelClientConnectionTests + { + [SetUp] + public void Up() + { + _lowLevelClient = StreamChatLowLevelClient.CreateDefaultClient(StreamTestClients.Instance.LowLevelClientCredentials, new StreamClientConfig + { + LogLevel = StreamLogLevel.Debug + }); + } + + [UnityTearDown] + public IEnumerator TearDown() + { + yield return TearDownAsync().RunAsIEnumerator(); + } + + private async Task TearDownAsync() + { + if (_lowLevelClient.ConnectionState == ConnectionState.Connected) + { + await _lowLevelClient.DisconnectAsync(); + } + + _lowLevelClient.Dispose(); + _lowLevelClient = null; + } + + [UnityTest] + public IEnumerator When_client_connects_and_disconnects_multiple_times_expect_client_to_have_a_correct_connection_state() + { + yield return When_client_connects_and_disconnects_multiple_times_expect_client_to_have_a_correct_connection_state_Async() + .RunAsIEnumerator(_lowLevelClient); + } + + private async Task When_client_connects_and_disconnects_multiple_times_expect_client_to_have_a_correct_connection_state_Async() + { + var credentials = StreamTestClients.Instance.LowLevelClientCredentials; + + async Task ConnectAsync() + { + var timeout = Task.Delay(TimeSpan.FromSeconds(10)); + var taskCompletion = new TaskCompletionSource(); + + void OnUserConnected(OwnUser ownUser) + { + _lowLevelClient.Connected -= OnUserConnected; + taskCompletion.SetResult(true); + } + + _lowLevelClient.Connected -= OnUserConnected; + _lowLevelClient.Connected += OnUserConnected; + + _lowLevelClient.ConnectUser(credentials); + + if (await Task.WhenAny(timeout, taskCompletion.Task) == timeout) + { + throw new TimeoutException("Reached timeout when waiting for client to connect"); + } + + await taskCompletion.Task; + } + + await ConnectAsync(); + Assert.AreEqual(ConnectionState.Connected, _lowLevelClient.ConnectionState); + await _lowLevelClient.DisconnectAsync(permanent: true); + Assert.AreEqual(ConnectionState.Disconnected, _lowLevelClient.ConnectionState); + + await ConnectAsync(); + Assert.AreEqual(ConnectionState.Connected, _lowLevelClient.ConnectionState); + await _lowLevelClient.DisconnectAsync(permanent: true); + Assert.AreEqual(ConnectionState.Disconnected, _lowLevelClient.ConnectionState); + + await ConnectAsync(); + Assert.AreEqual(ConnectionState.Connected, _lowLevelClient.ConnectionState); + await _lowLevelClient.DisconnectAsync(permanent: true); + Assert.AreEqual(ConnectionState.Disconnected, _lowLevelClient.ConnectionState); + + await ConnectAsync(); + Assert.AreEqual(ConnectionState.Connected, _lowLevelClient.ConnectionState); + await _lowLevelClient.DisconnectAsync(permanent: true); + Assert.AreEqual(ConnectionState.Disconnected, _lowLevelClient.ConnectionState); + + await ConnectAsync(); + Assert.AreEqual(ConnectionState.Connected, _lowLevelClient.ConnectionState); + await _lowLevelClient.DisconnectAsync(permanent: true); + Assert.AreEqual(ConnectionState.Disconnected, _lowLevelClient.ConnectionState); + } + + //StreamTodo: Debug why this test is causing NullRef Exception. Check if this is happening outside of test + // [UnityTest] + // public IEnumerator When_consumer_cancels_connection_attempt_expect_client_to_terminate_connecting_state() + // { + // yield return When_consumer_cancels_connection_attempt_expect_client_to_terminate_connecting_state_Async() + // .RunAsIEnumerator(_lowLevelClient); + // } + // + // private async Task When_consumer_cancels_connection_attempt_expect_client_to_terminate_connecting_state_Async() + // { + // var credentials = StreamTestClients.Instance.LowLevelClientCredentials; + // + // _lowLevelClient.ConnectUser(credentials); + // + // //await Task.Delay(500); // With this delay the Null ref will not occur + // + // await _lowLevelClient.DisconnectAsync(permanent: true); + // Assert.AreEqual(ConnectionState.Disconnected, _lowLevelClient.ConnectionState); + // } + + //StreamTodo: assert that the connection will timeout + + private IStreamChatLowLevelClient _lowLevelClient; + } +} +#endif \ No newline at end of file diff --git a/Assets/Plugins/StreamChat/Tests/LowLevelClient/LowLevelClientConnectionTests.cs.meta b/Assets/Plugins/StreamChat/Tests/LowLevelClient/LowLevelClientConnectionTests.cs.meta new file mode 100644 index 00000000..7bc5ee04 --- /dev/null +++ b/Assets/Plugins/StreamChat/Tests/LowLevelClient/LowLevelClientConnectionTests.cs.meta @@ -0,0 +1,3 @@ +fileFormatVersion: 2 +guid: 2f319e32e8874e04bb381f8cf663a6e1 +timeCreated: 1719327223 \ No newline at end of file diff --git a/Assets/Plugins/StreamChat/Tests/LowLevelClient/ReconnectSchedulerTests.cs b/Assets/Plugins/StreamChat/Tests/LowLevelClient/ReconnectSchedulerTests.cs index dd355bb0..204661b6 100644 --- a/Assets/Plugins/StreamChat/Tests/LowLevelClient/ReconnectSchedulerTests.cs +++ b/Assets/Plugins/StreamChat/Tests/LowLevelClient/ReconnectSchedulerTests.cs @@ -3,6 +3,7 @@ using NUnit.Framework; using StreamChat.Core; using StreamChat.Core.LowLevelClient; +using StreamChat.Libs.Logs; using StreamChat.Libs.NetworkMonitors; using StreamChat.Libs.Time; @@ -16,9 +17,10 @@ public void Up() { _mockTimeService = Substitute.For(); _mockNetworkMonitor = Substitute.For(); + var mockLogs = Substitute.For(); _clientMock = Substitute.For(); - _timeScheduler = new ReconnectScheduler(_mockTimeService, _clientMock, _mockNetworkMonitor); + _timeScheduler = new ReconnectScheduler(_mockTimeService, _clientMock, _mockNetworkMonitor, mockLogs); } [TearDown] diff --git a/Assets/Plugins/StreamChat/Tests/StreamTestClients.cs b/Assets/Plugins/StreamChat/Tests/StreamTestClients.cs index fd5936ac..d82b1153 100644 --- a/Assets/Plugins/StreamChat/Tests/StreamTestClients.cs +++ b/Assets/Plugins/StreamChat/Tests/StreamTestClients.cs @@ -85,6 +85,7 @@ public StreamChatClient OtherStateClient public string OtherUserId => _otherUserCredentials.UserId; public IEnumerable OtherUserCredentials => _otherUsersCredentials; + public AuthCredentials LowLevelClientCredentials { get; } public IEnumerator ReconnectLowLevelClientClient() { @@ -103,7 +104,6 @@ public Task ConnectOtherStateClientAsync() private readonly HashSet _locks = new HashSet(); - private readonly AuthCredentials _lowLevelClientCredentials; private readonly AuthCredentials _stateClientCredentials; private readonly AuthCredentials _otherUserCredentials; private readonly List _otherUsersCredentials; @@ -126,7 +126,7 @@ private StreamTestClients() var adminData = testAuthDataSet.TestAdminData.OrderBy(_ => Random.value).ToList(); - _lowLevelClientCredentials = adminData[0]; + LowLevelClientCredentials = adminData[0]; _stateClientCredentials = adminData[1]; _otherUserCredentials = adminData[2]; _otherUsersCredentials = adminData.Skip(3).ToList(); @@ -218,7 +218,7 @@ private async Task DisposeStateClientsAsync() private void InitLowLevelClient() { - _lowLevelClient = StreamChatLowLevelClient.CreateDefaultClient(_lowLevelClientCredentials); + _lowLevelClient = StreamChatLowLevelClient.CreateDefaultClient(LowLevelClientCredentials); _lowLevelClient.Connected += OnClientConnected; _lowLevelClient.Connect(); }