Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions sdks/csharp/src/SpacetimeDBClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public sealed class DbConnectionBuilder<DbConnection>
string? token;
Compression? compression;
bool light;
bool? confirmedReads;

public DbConnection Build()
{
Expand All @@ -32,7 +33,7 @@ public DbConnection Build()
{
throw new InvalidOperationException("Building DbConnection with a null nameOrAddress. Call WithModuleName() first.");
}
conn.Connect(token, uri, nameOrAddress, compression ?? Compression.Brotli, light);
conn.Connect(token, uri, nameOrAddress, compression ?? Compression.Brotli, light, confirmedReads);
#if UNITY_5_3_OR_NEWER
if (SpacetimeDBNetworkManager._instance != null)
{
Expand Down Expand Up @@ -72,6 +73,12 @@ public DbConnectionBuilder<DbConnection> WithLightMode(bool light)
return this;
}

public DbConnectionBuilder<DbConnection> WithConfirmedReads(bool confirmedReads)
{
this.confirmedReads = confirmedReads;
return this;
}

public delegate void ConnectCallback(DbConnection conn, Identity identity, string token);

public DbConnectionBuilder<DbConnection> OnConnect(ConnectCallback cb)
Expand Down Expand Up @@ -99,7 +106,7 @@ public DbConnectionBuilder<DbConnection> OnDisconnect(DisconnectCallback cb)

public interface IDbConnection
{
internal void Connect(string? token, string uri, string addressOrName, Compression compression, bool light);
internal void Connect(string? token, string uri, string addressOrName, Compression compression, bool light, bool? confirmedReads);

internal void AddOnConnect(Action<Identity, string> cb);
internal void AddOnConnectError(WebSocket.ConnectErrorEventHandler cb);
Expand Down Expand Up @@ -184,7 +191,7 @@ protected DbConnectionBase()
SpacetimeDBNetworkManager._instance.RemoveConnection(this);
}
};

#if UNITY_WEBGL && !UNITY_EDITOR
if (SpacetimeDBNetworkManager._instance != null)
SpacetimeDBNetworkManager._instance.StartCoroutine(ParseMessages());
Expand Down Expand Up @@ -484,7 +491,22 @@ public void Disconnect()
/// </summary>
/// <param name="uri"> URI of the SpacetimeDB server (ex: https://testnet.spacetimedb.com)
/// <param name="addressOrName">The name or address of the database to connect to</param>
void IDbConnection.Connect(string? token, string uri, string addressOrName, Compression compression, bool light)
/// <param name="compression">The compression settings to use</param>
/// <param name="light">Whether or not to request light updates</param>
/// <param name="confirmedReads">
/// If set to true, instruct the server to send updates for transactions
/// only after they are confirmed to be durable.
///
/// What durable means depends on the server configuration. In general,
/// a transaction is durable when it has been written to disk on one or
/// more servers.
///
/// If set to false, instruct the server to send updates as soon as
/// transactions are committed in memory.
///
/// If not set, the server chooses the default.
/// </param>
void IDbConnection.Connect(string? token, string uri, string addressOrName, Compression compression, bool light, bool? confirmedReads)
{
isClosing = false;

Expand All @@ -509,7 +531,7 @@ async Task Function()
{
try
{
await webSocket.Connect(token, uri, addressOrName, ConnectionId, compression, light);
await webSocket.Connect(token, uri, addressOrName, ConnectionId, compression, light, confirmedReads);
}
catch (Exception e)
{
Expand Down Expand Up @@ -879,8 +901,8 @@ void IDbConnection.Unsubscribe(QueryId queryId)
/// Represents the result of parsing a database update message from SpacetimeDB.
/// Contains updates for all tables affected by the update, with each entry mapping a table handle
/// to its respective set of row changes (by primary key or row instance).
///
/// Note: Due to C#'s struct constructor limitations, you must use <see cref="ParsedDatabaseUpdate.New"/>
///
/// Note: Due to C#'s struct constructor limitations, you must use <see cref="ParsedDatabaseUpdate.New"/>
/// to create new instances.
/// Do not use the default constructor, as it will not initialize the Updates dictionary.
/// </summary>
Expand Down
28 changes: 20 additions & 8 deletions sdks/csharp/src/WebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public WebSocket(ConnectOptions options)
private bool _isConnected = false;
private bool _isConnecting = false;
public bool IsConnected => _isConnected;
#else
#else
public bool IsConnected { get { return Ws != null && Ws.State == WebSocketState.Open; } }
#endif

Expand Down Expand Up @@ -136,22 +136,28 @@ private void InitializeWebGL()
var messagePtr = Marshal.GetFunctionPointerForDelegate((Action<int, IntPtr, int>)WebGLOnMessage);
var closePtr = Marshal.GetFunctionPointerForDelegate((Action<int, int, IntPtr>)WebGLOnClose);
var errorPtr = Marshal.GetFunctionPointerForDelegate((Action<int>)WebGLOnError);

WebSocket_Init(openPtr, messagePtr, closePtr, errorPtr);
}
#endif

public async Task Connect(string? auth, string host, string nameOrAddress, ConnectionId connectionId, Compression compression, bool light)
public async Task Connect(string? auth, string host, string nameOrAddress, ConnectionId connectionId, Compression compression, bool light, bool? confirmedReads)
{
#if UNITY_WEBGL && !UNITY_EDITOR
if (_isConnecting || _isConnected) return;

_isConnecting = true;
try
{
var uri = $"{host}/v1/database/{nameOrAddress}/subscribe?connection_id={connectionId}&compression={compression}";
if (light) uri += "&light=true";

if (confirmedReads.HasValue)
{
// Ensure to transmit the bool as lowercase.
var enabled = confirmedReads.GetValueOrDefault() ? "true" : "false";
uri += $"&confirmed={enabled}";
}

_socketId = new TaskCompletionSource<int>();
var callbackPtr = Marshal.GetFunctionPointerForDelegate((Action<int>)OnSocketIdReceived);
WebSocket_Connect(host, uri, _options.Protocol, auth, callbackPtr);
Expand All @@ -177,6 +183,12 @@ public async Task Connect(string? auth, string host, string nameOrAddress, Conne
{
uri += "&light=true";
}
if (confirmedReads.HasValue)
{
// Ensure to transmit the bool as lowercase.
var enabled = confirmedReads.GetValueOrDefault() ? "true" : "false";
uri += $"&confirmed={enabled}";
}
var url = new Uri(uri);
Ws.Options.AddSubProtocol(_options.Protocol);

Expand Down Expand Up @@ -457,15 +469,15 @@ public void HandleWebGLOpen(int socketId)
dispatchQueue.Enqueue(() => OnConnect());
}
}

public void HandleWebGLMessage(int socketId, byte[] message)
{
if (socketId == _webglSocketId && OnMessage != null)
{
dispatchQueue.Enqueue(() => OnMessage(message, DateTime.UtcNow));
}
}

public void HandleWebGLClose(int socketId, int code, string reason)
{
UnityEngine.Debug.Log($"HandleWebGLClose: {code} {reason}");
Expand All @@ -476,7 +488,7 @@ public void HandleWebGLClose(int socketId, int code, string reason)
dispatchQueue.Enqueue(() => OnClose?.Invoke(ex));
}
}

public void HandleWebGLError(int socketId)
{
UnityEngine.Debug.Log($"HandleWebGLError: {socketId}");
Expand Down
Loading