Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
roji committed Apr 11, 2022
1 parent 8c99fb9 commit 8c99af0
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 18 deletions.
17 changes: 8 additions & 9 deletions src/EFCore.Relational/Update/Internal/CommandBatchPreparer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class CommandBatchPreparer : ICommandBatchPreparer
{
private readonly int _minBatchSize;
private readonly bool _sensitiveLoggingEnabled;
private readonly Multigraph<IReadOnlyModificationCommand, IAnnotatable> _modificationCommandGraph = new();
private readonly Multigraph<IReadOnlyModificationCommand, IAnnotatable> _modificationCommandGraph;

/// <summary>
/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to
Expand All @@ -29,6 +29,8 @@ public CommandBatchPreparer(CommandBatchPreparerDependencies dependencies)
_minBatchSize =
dependencies.Options.Extensions.OfType<RelationalOptionsExtension>().FirstOrDefault()?.MinBatchSize
?? 1;

_modificationCommandGraph = new(dependencies.ModificationCommandComparer);
Dependencies = dependencies;

if (dependencies.LoggingOptions.IsSensitiveDataLoggingEnabled)
Expand Down Expand Up @@ -57,17 +59,14 @@ public CommandBatchPreparer(CommandBatchPreparerDependencies dependencies)
{
var parameterNameGenerator = Dependencies.ParameterNameGeneratorFactory.Create();
var commands = CreateModificationCommands(entries, updateAdapter, parameterNameGenerator.GenerateNext);
var sortedCommandSets = TopologicalSort(commands);
var commandSets = TopologicalSort(commands);

for (var commandSetIndex = 0; commandSetIndex < sortedCommandSets.Count; commandSetIndex++)
for (var commandSetIndex = 0; commandSetIndex < commandSets.Count; commandSetIndex++)
{
var independentCommandSet = sortedCommandSets[commandSetIndex];

// TODO: Integrate into the topological sort
// independentCommandSet.Sort(Dependencies.ModificationCommandComparer);
var commandSet = commandSets[commandSetIndex];

var batch = Dependencies.ModificationCommandBatchFactory.Create();
foreach (var modificationCommand in independentCommandSet)
foreach (var modificationCommand in commandSet)
{
(modificationCommand as ModificationCommand)?.AssertColumnsNotInitialized();
if (modificationCommand.EntityState == EntityState.Modified
Expand Down Expand Up @@ -109,7 +108,7 @@ public CommandBatchPreparer(CommandBatchPreparerDependencies dependencies)
}
}

var hasMoreCommandSets = commandSetIndex < sortedCommandSets.Count - 1;
var hasMoreCommandSets = commandSetIndex < commandSets.Count - 1;

if (batch.ModificationCommands.Count == 1
|| batch.ModificationCommands.Count >= _minBatchSize)
Expand Down
27 changes: 18 additions & 9 deletions src/Shared/Multigraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ namespace Microsoft.EntityFrameworkCore.Utilities;
internal class Multigraph<TVertex, TEdge> : Graph<TVertex>
where TVertex : notnull
{
private readonly IComparer<TVertex>? _modificationCommandComparer;
private readonly HashSet<TVertex> _vertices = new();
private readonly Dictionary<TVertex, Dictionary<TVertex, object?>> _successorMap = new();
private readonly Dictionary<TVertex, Dictionary<TVertex, object?>> _predecessorMap = new();

public Multigraph(IComparer<TVertex>? modificationCommandComparer = null)
=> _modificationCommandComparer = modificationCommandComparer;

public IEnumerable<TEdge> GetEdges(TVertex from, TVertex to)
{
if (_successorMap.TryGetValue(from, out var successorSet))
Expand Down Expand Up @@ -275,6 +279,8 @@ public IReadOnlyList<List<TVertex>> BatchingTopologicalSort(
Func<TVertex, TVertex, IEnumerable<TEdge>, bool>? tryBreakEdge,
Func<IReadOnlyList<Tuple<TVertex, TVertex, IEnumerable<TEdge>>>, string>? formatCycle)
{
// Bootstrap the topological sort by finding all vertexes which have no predecessors - we'll do a breadth-first pass starting from
// them.
var currentRootsQueue = new List<TVertex>();
var predecessorCounts = new Dictionary<TVertex, int>(_predecessorMap.Count);
foreach (var (vertex, vertices) in _predecessorMap)
Expand All @@ -291,21 +297,30 @@ public IReadOnlyList<List<TVertex>> BatchingTopologicalSort(
}

var vertexesProcessed = 0;
var result = new List<List<TVertex>>();
var batchBoundaryRequired = false;
var nextRootsQueue = new List<TVertex>();
var currentBatch = new List<TVertex>();
var currentBatchSet = new HashSet<TVertex>();
var batchBoundaryRequired = false;
var result = new List<List<TVertex>>();
result.Add(currentBatch);

while (vertexesProcessed < _vertices.Count)
{
while (currentRootsQueue.Count > 0)
{
// Secondary sorting: after the first topological sorting (according to dependencies between the commands as expressed in
// the graph), we apply a secondary sort, which ensures a deterministic ordering of commands and prevents deadlocks between
// concurrent transactions locking the same rows in different orders.
if (_modificationCommandComparer is not null)
{
currentRootsQueue.Sort(_modificationCommandComparer);
}

// If we detected in the last roots pass that a batch boundary is required, close the current batch and start a new one.
if (batchBoundaryRequired)
{
result.Add(currentBatch);
currentBatch = new();
result.Add(currentBatch);
currentBatchSet.Clear();

batchBoundaryRequired = false;
Expand Down Expand Up @@ -435,12 +450,6 @@ public IReadOnlyList<List<TVertex>> BatchingTopologicalSort(
}
}

// Add the pending last batch to the list of batches.
if (currentBatch.Count > 0)
{
result.Add(new(currentBatch));
}

return result;
}

Expand Down

0 comments on commit 8c99af0

Please sign in to comment.