Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetLeaseLockedRows for debug logging #900

Merged
merged 14 commits into from
Aug 5, 2023
46 changes: 46 additions & 0 deletions src/TriggerBinding/SqlTableChangeMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo

getChangesDurationMs = commandSw.ElapsedMilliseconds;
}
// Get the Lease Locked rows and log the number of rows.
int leaseLockedRowCount = await this.GetLeaseLockedRowCount(connection, transaction);
this._logger.LogDebug($"Executed GetChangesCommand in GetTableChangesAsync. {rows.Count} available changed rows ({leaseLockedRowCount} found with lease locks).");
MaddyDev marked this conversation as resolved.
Show resolved Hide resolved

// If changes were found, acquire leases on them.
if (rows.Count > 0)
Expand Down Expand Up @@ -810,6 +813,49 @@ LEFT OUTER JOIN {this._userTable.BracketQuotedFullName} AS u ON {userTableJoinCo
return new SqlCommand(getChangesQuery, connection, transaction);
}

/// <summary>
/// Returns the number of changes(rows) on the user's table that are actively locked by other leases.
/// </summary>
/// <param name="connection">The connection to add to the SqlCommand</param>
/// <param name="transaction">The transaction to add to the SqlCommand</param>
/// <returns>The number of rows locked by leases</returns>
private async Task<int> GetLeaseLockedRowCount(SqlConnection connection, SqlTransaction transaction)
{
string leasesTableJoinCondition = string.Join(" AND ", this._primaryKeyColumns.Select(col => $"c.{col.name.AsBracketQuotedString()} = l.{col.name.AsBracketQuotedString()}"));
int leaseLockedRows = 0;
MaddyDev marked this conversation as resolved.
Show resolved Hide resolved
long getLockedRowCountDurationMs = 0L;
// Get the list of changes from CHANGETABLE that meet the following criteria:
// * Null LeaseExpirationTime AND (Null ChangeVersion OR ChangeVersion < Current change version for that row from CHANGETABLE)
// OR
// * LeaseExpirationTime < Current Time
//
MaddyDev marked this conversation as resolved.
Show resolved Hide resolved
// The LeaseExpirationTime is only used for rows currently being processed - so if we see a
MaddyDev marked this conversation as resolved.
Show resolved Hide resolved
// row whose lease has expired that means that something must have happened to the function
// processing it before it was able to complete successfully. In that case we want to pick it
// up regardless since we know it should be processed - no need to check the change version.
// Once a row is successfully processed the LeaseExpirationTime column is set to NULL.
string getLeaseLockedrowCountQuery = $@"
{AppLockStatements}

DECLARE @last_sync_version bigint;
SELECT @last_sync_version = LastSyncVersion
FROM {GlobalStateTableName}
WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId};

SELECT COUNT(*)
FROM CHANGETABLE(CHANGES {this._userTable.BracketQuotedFullName}, @last_sync_version) AS c
LEFT OUTER JOIN {this._leasesTableName} AS l ON {leasesTableJoinCondition}
MaddyDev marked this conversation as resolved.
Show resolved Hide resolved
WHERE l.{LeasesTableLeaseExpirationTimeColumnName} IS NOT NULL AND l.{LeasesTableLeaseExpirationTimeColumnName} > SYSDATETIME()";

using (var getLeaseLockedRowCountCommand = new SqlCommand(getLeaseLockedrowCountQuery, connection, transaction))
{
var commandSw = Stopwatch.StartNew();
leaseLockedRows = (int)await getLeaseLockedRowCountCommand.ExecuteScalarAsyncWithLogging(this._logger, CancellationToken.None);
getLockedRowCountDurationMs = commandSw.ElapsedMilliseconds;
MaddyDev marked this conversation as resolved.
Show resolved Hide resolved
}
return leaseLockedRows;
}

/// <summary>
/// Builds the query to acquire leases on the rows in "_rows" if changes are detected in the user's table
/// (<see cref="RunChangeConsumptionLoopAsync()"/>).
Expand Down