diff --git a/docs/documents/storing.md b/docs/documents/storing.md index 4f38b9ca94..afdb15a6b0 100644 --- a/docs/documents/storing.md +++ b/docs/documents/storing.md @@ -12,7 +12,7 @@ a previously persisted document with the same identity. Here's that method in ac with a sample that shows storing both a brand new document and a modified document: - + ```cs using var store = DocumentStore.For("some connection string"); @@ -35,7 +35,7 @@ session.Store(newUser, existingUser); await session.SaveChangesAsync(); ``` -snippet source | anchor +snippet source | anchor The `Store()` method can happily take a mixed bag of document types at one time, but you'll need to tell Marten to use `Store()` instead of letting it infer the document type as shown below: @@ -101,7 +101,7 @@ theStore.BulkInsert(data, batchSize: 500); // And just checking that the data is actually there;) theSession.Query().Count().ShouldBe(data.Length); ``` -snippet source | anchor +snippet source | anchor The bulk insert is done with a single transaction. For really large document collections, you may need to page the calls to `IDocumentStore.BulkInsert()`. @@ -126,13 +126,13 @@ await theStore.BulkInsertAsync(data, batchSize: 500); // And just checking that the data is actually there;) theSession.Query().Count().ShouldBe(data.Length); ``` -snippet source | anchor +snippet source | anchor By default, bulk insert will fail if there are any duplicate id's between the documents being inserted and the existing database data. You can alter this behavior through the `BulkInsertMode` enumeration as shown below: - + ```cs // Just say we have an array of documents we want to bulk insert var data = Target.GenerateRandomData(100).ToArray(); @@ -151,7 +151,25 @@ await store.BulkInsertDocumentsAsync(data, BulkInsertMode.InsertsOnly); // being loaded await store.BulkInsertDocumentsAsync(data, BulkInsertMode.OverwriteExisting); ``` -snippet source | anchor +snippet source | anchor + + +When using `BulkInsertMode.OverwriteExisting` it is also possible to pass in a condition to be evaluated when overwriting documents. +This allows for checks such as "don't update document if the existing document is newer" without having to do more expensive round trips. +Be careful with using this - this is a low level call and the update condition does not support parameters or protection from sql injection. + + + +```cs +// perform a bulk insert of `Target` documents +// but only overwrite existing if the existing document's "Number" +// property is less then the new document's +await theStore.BulkInsertAsync( + data2, + BulkInsertMode.OverwriteExisting, + updateCondition: "(d.data ->> 'Number')::int <= (excluded.data ->> 'Number')::int"); +``` +snippet source | anchor The bulk insert feature can also be used with multi-tenanted documents, but in that @@ -159,7 +177,7 @@ case you are limited to only loading documents to a single tenant at a time as shown below: - + ```cs // Just say we have an array of documents we want to bulk insert var data = Target.GenerateRandomData(100).ToArray(); @@ -173,5 +191,5 @@ using var store = DocumentStore.For(opts => // If multi-tenanted await store.BulkInsertDocumentsAsync("a tenant id", data); ``` -snippet source | anchor +snippet source | anchor diff --git a/src/DocumentDbTests/Writing/bulk_loading.cs b/src/DocumentDbTests/Writing/bulk_loading.cs index 5c952fb015..a94b2dd1af 100644 --- a/src/DocumentDbTests/Writing/bulk_loading.cs +++ b/src/DocumentDbTests/Writing/bulk_loading.cs @@ -86,6 +86,62 @@ public void load_with_overwrite_duplicates() count.ShouldBe(0); } + [Fact] + public void load_with_overwrite_duplicates_and_update_condition() + { + var data1 = Target.GenerateRandomData(30) + .Select(t => t with { Number = 100, String = "initial insert" }) + .ToArray(); + theStore.BulkInsert(data1); + + var data2 = data1.Take(20) + .Select((t, i) => t with { Number = i < 10 ? 50 : 150, String = "second insert" }) + .ToArray(); + + #region sample_BulkInsertWithUpdateCondition + + // perform a bulk insert of `Target` documents + // but only overwrite existing if the existing document's "Number" + // property is less then the new document's + await theStore.BulkInsertAsync( + data2, + BulkInsertMode.OverwriteExisting, + updateCondition: "(d.data ->> 'Number')::int <= (excluded.data ->> 'Number')::int"); + + #endregion + + using var session = theStore.QuerySession(); + session.Query().Count().ShouldBe(data1.Length); + + // Values were overwritten + session.Query().Count(t => t.Number == 100 && t.String == "initial insert").ShouldBe(20); + session.Query().Count(t => t.Number == 150 && t.String == "second insert").ShouldBe(10); + session.Query().Count(t => t.Number == 50).ShouldBe(0); + } + + [Fact] + public async Task load_with_overwrite_duplicates_and_update_condition_async() + { + var data1 = Target.GenerateRandomData(30) + .Select(t => t with { Number = 100, String = "initial insert" }) + .ToArray(); + await theStore.BulkInsertAsync(data1); + + var data2 = data1.Take(20) + .Select((t, i) => t with { Number = i < 10 ? 50 : 150, String = "second insert" }) + .ToArray(); + + await theStore.BulkInsertAsync(data2, BulkInsertMode.OverwriteExisting, updateCondition: "(d.data ->> 'Number')::int <= (excluded.data ->> 'Number')::int"); + + await using var session = theStore.QuerySession(); + (await session.Query().CountAsync()).ShouldBe(data1.Length); + + // Values were overwritten + (await session.Query().CountAsync(t => t.Number == 100 && t.String == "initial insert")).ShouldBe(20); + (await session.Query().CountAsync(t => t.Number == 150 && t.String == "second insert")).ShouldBe(10); + (await session.Query().Count(t => t.Number == 50)).ShouldBe(0); + } + [Fact] public void load_with_small_batch() { @@ -265,7 +321,7 @@ public async Task load_with_small_batch_async() [Fact] public async Task load_across_multiple_tenants_async() { - StoreOptions(opts => + StoreOptions(opts => { opts.Policies.AllDocumentsAreMultiTenanted(); }); @@ -497,7 +553,7 @@ public void load_enlist_transaction() { theStore.BulkInsertEnlistTransaction(data, Transaction.Current); scope.Complete(); - } + } using var session = theStore.QuerySession(); session.Query().Count().ShouldBe(data.Length); @@ -510,7 +566,7 @@ public void load_enlist_transaction_no_commit() using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) { - theStore.BulkInsertEnlistTransaction(data, Transaction.Current); + theStore.BulkInsertEnlistTransaction(data, Transaction.Current); } using var session = theStore.QuerySession(); diff --git a/src/Marten.Testing/Documents/Target.cs b/src/Marten.Testing/Documents/Target.cs index cca6377e4a..ec91c508ca 100644 --- a/src/Marten.Testing/Documents/Target.cs +++ b/src/Marten.Testing/Documents/Target.cs @@ -17,7 +17,7 @@ public enum Colors Orange } -public class Target +public record Target { private static readonly Random _random = new Random(67); diff --git a/src/Marten/DocumentStore.cs b/src/Marten/DocumentStore.cs index cf54a4c6f1..b7e6a10ea6 100644 --- a/src/Marten/DocumentStore.cs +++ b/src/Marten/DocumentStore.cs @@ -17,10 +17,8 @@ using Marten.Internal.Sessions; using Marten.Services; using Marten.Storage; -using Microsoft.CodeAnalysis.Options; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; -using Weasel.Core.Migrations; using Weasel.Postgresql.Connections; using IsolationLevel = System.Data.IsolationLevel; @@ -114,18 +112,18 @@ public ValueTask DisposeAsync() public AdvancedOperations Advanced { get; } public void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, string? updateCondition = null) { var bulkInsertion = new BulkInsertion(Tenancy.Default, Options); - bulkInsertion.BulkInsert(documents, mode, batchSize); + bulkInsertion.BulkInsert(documents, mode, batchSize, updateCondition); } public void BulkInsertEnlistTransaction(IReadOnlyCollection documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, string? updateCondition = null) { var bulkInsertion = new BulkInsertion(Tenancy.Default, Options); - bulkInsertion.BulkInsertEnlistTransaction(documents, transaction, mode, batchSize); + bulkInsertion.BulkInsertEnlistTransaction(documents, transaction, mode, batchSize, updateCondition); } public void BulkInsertDocuments(IEnumerable documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, @@ -137,10 +135,11 @@ public void BulkInsertDocuments(IEnumerable documents, BulkInsertMode mo public void BulkInsert(string tenantId, IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, + string? updateCondition = null) { var bulkInsertion = new BulkInsertion(Tenancy.GetTenant(Options.MaybeCorrectTenantId(tenantId)), Options); - bulkInsertion.BulkInsert(documents, mode, batchSize); + bulkInsertion.BulkInsert(documents, mode, batchSize, updateCondition); } public void BulkInsertDocuments(string tenantId, IEnumerable documents, @@ -153,27 +152,35 @@ public void BulkInsertDocuments(string tenantId, IEnumerable documents, public Task BulkInsertAsync(IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000, CancellationToken cancellation = default) + int batchSize = 1000, + string? updateCondition = null, + CancellationToken cancellation = default) { var bulkInsertion = new BulkInsertion(Tenancy.Default, Options); - return bulkInsertion.BulkInsertAsync(documents, mode, batchSize, cancellation); + return bulkInsertion.BulkInsertAsync(documents, mode, batchSize, updateCondition, cancellation); } public Task BulkInsertEnlistTransactionAsync(IReadOnlyCollection documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000, CancellationToken cancellation = default) + int batchSize = 1000, + string? updateCondition = null, + CancellationToken cancellation = default) { var bulkInsertion = new BulkInsertion(Tenancy.Default, Options); - return bulkInsertion.BulkInsertEnlistTransactionAsync(documents, transaction, mode, batchSize, cancellation); + return bulkInsertion.BulkInsertEnlistTransactionAsync(documents, transaction, mode, batchSize, updateCondition, cancellation); } - public async Task BulkInsertAsync(string tenantId, IReadOnlyCollection documents, - BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, + public async Task BulkInsertAsync( + string tenantId, + IReadOnlyCollection documents, + BulkInsertMode mode = BulkInsertMode.InsertsOnly, + int batchSize = 1000, + string? updateCondition = null, CancellationToken cancellation = default) { var bulkInsertion = new BulkInsertion(await Tenancy.GetTenantAsync(Options.MaybeCorrectTenantId(tenantId)).ConfigureAwait(false), Options); - await bulkInsertion.BulkInsertAsync(documents, mode, batchSize, cancellation).ConfigureAwait(false); + await bulkInsertion.BulkInsertAsync(documents, mode, batchSize, updateCondition, cancellation).ConfigureAwait(false); } public Task BulkInsertDocumentsAsync(IEnumerable documents, diff --git a/src/Marten/IDocumentStore.cs b/src/Marten/IDocumentStore.cs index 488660a664..566921521e 100644 --- a/src/Marten/IDocumentStore.cs +++ b/src/Marten/IDocumentStore.cs @@ -45,8 +45,19 @@ public interface IDocumentStore: IDisposable /// /// /// + /// + /// Raw sql that is used as the WHERE clause of the ON CONFLICT DO UPDATE statement when configured for . + /// Use d. to refer to the existing record in the table and excluded. to refer to the conflicting record that was excluded from insertion. + /// + /// + /// Example: + /// + /// "d.export_date <= excluded.export_date" + /// + /// + /// void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000); + int batchSize = 1000, string? updateCondition = null); /// /// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store @@ -57,8 +68,19 @@ void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode = BulkI /// an existing transaction /// /// + /// + /// Raw sql that is used as the WHERE clause of the ON CONFLICT DO UPDATE statement when configured for . + /// Use d. to refer to the existing record in the table and excluded. to refer to the conflicting record that was excluded from insertion. + /// + /// + /// Example: + /// + /// "d.export_date <= excluded.export_date" + /// + /// + /// void BulkInsertEnlistTransaction(IReadOnlyCollection documents, Transaction transaction, - BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000); + BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, string? updateCondition = null); /// /// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store @@ -68,9 +90,20 @@ void BulkInsertEnlistTransaction(IReadOnlyCollection documents, Transactio /// /// /// + /// + /// Raw sql that is used as the WHERE clause of the ON CONFLICT DO UPDATE statement when configured for . + /// Use d. to refer to the existing record in the table and excluded. to refer to the conflicting record that was excluded from insertion. + /// + /// + /// Example: + /// + /// "d.export_date <= excluded.export_date" + /// + /// + /// /// void BulkInsert(string tenantId, IReadOnlyCollection documents, - BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000); + BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, string? updateCondition = null); /// /// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store @@ -80,8 +113,19 @@ void BulkInsert(string tenantId, IReadOnlyCollection documents, /// /// /// + /// + /// Raw sql that is used as the WHERE clause of the ON CONFLICT DO UPDATE statement when configured for . + /// Use d. to refer to the existing record in the table and excluded. to refer to the conflicting record that was excluded from insertion. + /// + /// + /// Example: + /// + /// "d.export_date <= excluded.export_date" + /// + /// + /// Task BulkInsertAsync(IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000, CancellationToken cancellation = default); + int batchSize = 1000, string? updateCondition = null, CancellationToken cancellation = default); /// /// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store @@ -92,9 +136,20 @@ Task BulkInsertAsync(IReadOnlyCollection documents, BulkInsertMode mode = /// an existing transaction /// /// + /// + /// Raw sql that is used as the WHERE clause of the ON CONFLICT DO UPDATE statement when configured for . + /// Use d. to refer to the existing record in the table and excluded. to refer to the conflicting record that was excluded from insertion. + /// + /// + /// Example: + /// + /// "d.export_date <= excluded.export_date" + /// + /// + /// Task BulkInsertEnlistTransactionAsync(IReadOnlyCollection documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, - CancellationToken cancellation = default); + string? updateCondition = null, CancellationToken cancellation = default); /// /// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store @@ -105,9 +160,20 @@ Task BulkInsertEnlistTransactionAsync(IReadOnlyCollection documents, Trans /// /// /// + /// + /// Raw sql that is used as the WHERE clause of the ON CONFLICT DO UPDATE statement when configured for . + /// Use d. to refer to the existing record in the table and excluded. to refer to the conflicting record that was excluded from insertion. + /// + /// + /// Example: + /// + /// "d.export_date <= excluded.export_date" + /// + /// + /// Task BulkInsertAsync(string tenantId, IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, - CancellationToken cancellation = default); + string? updateCondition = null, CancellationToken cancellation = default); /// /// Open a new IDocumentSession with the supplied DocumentTracking. diff --git a/src/Marten/Internal/CodeGeneration/BulkLoader.cs b/src/Marten/Internal/CodeGeneration/BulkLoader.cs index ce5b12b752..8b3e1a2f1a 100644 --- a/src/Marten/Internal/CodeGeneration/BulkLoader.cs +++ b/src/Marten/Internal/CodeGeneration/BulkLoader.cs @@ -81,7 +81,7 @@ public async Task LoadIntoTempTableAsync(Tenant tenant, ISerializer serializer, public abstract string CopyNewDocumentsFromTempTable(); - public abstract string OverwriteDuplicatesFromTempTable(); + public abstract string UpsertFromTempTable(); public object GetNullable(TValue? value) where TValue : struct { diff --git a/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs b/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs index 563a0ae393..072a4437e6 100644 --- a/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs +++ b/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs @@ -52,8 +52,8 @@ public GeneratedType BuildType(GeneratedAssembly assembly) type.MethodFor(nameof(CopyNewDocumentsFromTempTable)) .Frames.ReturnNewStringConstant("COPY_NEW_DOCUMENTS_SQL", CopyNewDocumentsFromTempTable()); - type.MethodFor(nameof(OverwriteDuplicatesFromTempTable)) - .Frames.ReturnNewStringConstant("OVERWRITE_SQL", OverwriteDuplicatesFromTempTable()); + type.MethodFor(nameof(UpsertFromTempTable)) + .Frames.ReturnNewStringConstant("UPSERT_SQL", UpsertFromTempTable()); type.MethodFor(nameof(CreateTempTableForCopying)) .Frames.ReturnNewStringConstant("CREATE_TEMP_TABLE_FOR_COPYING_SQL", @@ -98,32 +98,41 @@ public string CopyNewDocumentsFromTempTable() var selectColumns = table.Columns.Where(x => x.Name != SchemaConstants.LastModifiedColumn) .Select(x => $"{_tempTable}.\\\"{x.Name}\\\"").Join(", "); - var joinExpression = isMultiTenanted - ? $"{_tempTable}.id = {storageTable}.id and {_tempTable}.tenant_id = {storageTable}.tenant_id" - : $"{_tempTable}.id = {storageTable}.id"; + var conflictColumns = isMultiTenanted + ? "tenant_id, id" + : "id"; return $"insert into {storageTable} ({columns}, {SchemaConstants.LastModifiedColumn}) " + - $"(select {selectColumns}, transaction_timestamp() " + - $"from {_tempTable} left join {storageTable} on {joinExpression} " + - $"where {storageTable}.id is null)"; + $"select {selectColumns}, transaction_timestamp() " + + $"from {_tempTable} " + + $"on conflict ({conflictColumns}) do nothing"; } - public string OverwriteDuplicatesFromTempTable() + public string UpsertFromTempTable() { var table = _mapping.Schema.Table; var isMultiTenanted = _mapping.TenancyStyle == TenancyStyle.Conjoined; var storageTable = table.Identifier.QualifiedName; + var columns = table.Columns.Where(x => x.Name != SchemaConstants.LastModifiedColumn) + .Select(x => $"\\\"{x.Name}\\\"").Join(", "); + var selectColumns = table.Columns.Where(x => x.Name != SchemaConstants.LastModifiedColumn) + .Select(x => $"{_tempTable}.\\\"{x.Name}\\\"").Join(", "); var updates = table.Columns.Where(x => x.Name != "id" && x.Name != SchemaConstants.LastModifiedColumn) - .Select(x => $"{x.Name} = source.{x.Name}").Join(", "); + .Select(x => $"{x.Name} = excluded.{x.Name}").Join(", "); - var joinExpression = isMultiTenanted - ? "source.id = target.id and source.tenant_id = target.tenant_id" - : "source.id = target.id"; + var conflictColumns = isMultiTenanted + ? "tenant_id, id" + : "id"; return - $"update {storageTable} target SET {updates}, {SchemaConstants.LastModifiedColumn} = transaction_timestamp() FROM {_tempTable} source WHERE {joinExpression}"; + $"insert into {storageTable} as d ({columns}, {SchemaConstants.LastModifiedColumn}) " + + $"select {selectColumns}, transaction_timestamp() " + + $"from {_tempTable} " + + $"on conflict ({conflictColumns}) do update " + + $"set {updates}, {SchemaConstants.LastModifiedColumn} = excluded.{SchemaConstants.LastModifiedColumn} " + + "where ( {0} )"; } public string CreateTempTableForCopying() diff --git a/src/Marten/Internal/CodeGeneration/SubClassBulkLoader.cs b/src/Marten/Internal/CodeGeneration/SubClassBulkLoader.cs index 917907a33e..3c1b3ef942 100644 --- a/src/Marten/Internal/CodeGeneration/SubClassBulkLoader.cs +++ b/src/Marten/Internal/CodeGeneration/SubClassBulkLoader.cs @@ -51,8 +51,8 @@ public string CopyNewDocumentsFromTempTable() return _inner.CopyNewDocumentsFromTempTable(); } - public string OverwriteDuplicatesFromTempTable() + public string UpsertFromTempTable() { - return _inner.OverwriteDuplicatesFromTempTable(); + return _inner.UpsertFromTempTable(); } } diff --git a/src/Marten/Schema/BulkLoading/IBulkLoader.cs b/src/Marten/Schema/BulkLoading/IBulkLoader.cs index 564fdd01ec..59f49562fb 100644 --- a/src/Marten/Schema/BulkLoading/IBulkLoader.cs +++ b/src/Marten/Schema/BulkLoading/IBulkLoader.cs @@ -26,5 +26,5 @@ Task LoadIntoTempTableAsync(Tenant tenant, ISerializer serializer, NpgsqlConnect string CopyNewDocumentsFromTempTable(); - string OverwriteDuplicatesFromTempTable(); + string UpsertFromTempTable(); } diff --git a/src/Marten/Storage/BulkInsertion.cs b/src/Marten/Storage/BulkInsertion.cs index ff90c9cec2..aadc2aafa2 100644 --- a/src/Marten/Storage/BulkInsertion.cs +++ b/src/Marten/Storage/BulkInsertion.cs @@ -28,8 +28,9 @@ public void Dispose() } public void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, string updateCondition = null) { + ValidateupdateCondition(mode, updateCondition); if (typeof(T) == typeof(object)) { BulkInsertDocuments(documents.OfType(), mode); @@ -44,7 +45,7 @@ public void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode try { - bulkInsertDocuments(documents, batchSize, conn, mode); + bulkInsertDocuments(documents, batchSize, conn, mode, updateCondition); tx.Commit(); } @@ -59,8 +60,10 @@ public void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode public void BulkInsertEnlistTransaction(IReadOnlyCollection documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, + string updateCondition = null) { + ValidateupdateCondition(mode, updateCondition); if (typeof(T) == typeof(object)) { BulkInsertDocumentsEnlistTransaction(documents.OfType(), transaction, mode); @@ -72,13 +75,14 @@ public void BulkInsertEnlistTransaction(IReadOnlyCollection documents, using var conn = _tenant.Database.CreateConnection(); conn.Open(); conn.EnlistTransaction(transaction); - bulkInsertDocuments(documents, batchSize, conn, mode); + bulkInsertDocuments(documents, batchSize, conn, mode, updateCondition); } } public async Task BulkInsertAsync(IReadOnlyCollection documents, BulkInsertMode mode, int batchSize, - CancellationToken cancellation) + string updateCondition = null, CancellationToken cancellation = default) { + ValidateupdateCondition(mode, updateCondition); if (typeof(T) == typeof(object)) { await BulkInsertDocumentsAsync(documents.OfType(), mode, batchSize, cancellation) @@ -94,7 +98,7 @@ await BulkInsertDocumentsAsync(documents.OfType(), mode, batchSize, canc var tx = await conn.BeginTransactionAsync(cancellation).ConfigureAwait(false); try { - await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, cancellation).ConfigureAwait(false); + await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, updateCondition, cancellation).ConfigureAwait(false); await tx.CommitAsync(cancellation).ConfigureAwait(false); } @@ -107,8 +111,9 @@ await BulkInsertDocumentsAsync(documents.OfType(), mode, batchSize, canc } public async Task BulkInsertEnlistTransactionAsync(IReadOnlyCollection documents, Transaction transaction, - BulkInsertMode mode, int batchSize, CancellationToken cancellation) + BulkInsertMode mode, int batchSize, string updateCondition = null, CancellationToken cancellation = default) { + ValidateupdateCondition(mode, updateCondition); if (typeof(T) == typeof(object)) { await BulkInsertDocumentsEnlistTransactionAsync(documents.OfType(), transaction, mode, batchSize, @@ -120,12 +125,12 @@ await BulkInsertDocumentsEnlistTransactionAsync(documents.OfType(), tran await using var conn = _tenant.Database.CreateConnection(); await conn.OpenAsync(cancellation).ConfigureAwait(false); conn.EnlistTransaction(transaction); - await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, cancellation).ConfigureAwait(false); + await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, updateCondition, cancellation).ConfigureAwait(false); } } public void BulkInsertDocuments(IEnumerable documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, string updateCondition = null) { var groups = bulkInserters(documents); @@ -150,7 +155,8 @@ public void BulkInsertDocuments(IEnumerable documents, BulkInsertMode mo public void BulkInsertDocumentsEnlistTransaction(IEnumerable documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, + string updateCondition = null) { var groups = bulkInserters(documents); var types = documentTypes(documents); @@ -184,7 +190,7 @@ private static IBulkInserter[] bulkInserters(IEnumerable documents) } public async Task BulkInsertDocumentsAsync(IEnumerable documents, BulkInsertMode mode, int batchSize, - CancellationToken cancellation) + CancellationToken cancellation = default) { var groups = bulkInserters(documents); @@ -231,10 +237,10 @@ CancellationToken cancellation } private void bulkInsertDocuments(IReadOnlyCollection documents, int batchSize, NpgsqlConnection conn, - BulkInsertMode mode) + BulkInsertMode mode, string updateCondition) { var provider = _tenant.Database.Providers.StorageFor(); - var loader = provider.BulkLoader; + var loader = provider.BulkLoader!; if (mode != BulkInsertMode.InsertsOnly) { @@ -274,18 +280,17 @@ private void bulkInsertDocuments(IReadOnlyCollection documents, int batchS } else if (mode == BulkInsertMode.OverwriteExisting) { - var overwrite = loader.OverwriteDuplicatesFromTempTable(); - var copy = loader.CopyNewDocumentsFromTempTable(); + var upsert = string.Format(loader.UpsertFromTempTable(), updateCondition ?? "true"); - conn.CreateCommand(overwrite + ";" + copy).ExecuteNonQuery(); + conn.CreateCommand(upsert).ExecuteNonQuery(); } } private async Task bulkInsertDocumentsAsync(IReadOnlyCollection documents, int batchSize, - NpgsqlConnection conn, BulkInsertMode mode, CancellationToken cancellation) + NpgsqlConnection conn, BulkInsertMode mode, string updateCondition, CancellationToken cancellation) { var provider = _tenant.Database.Providers.StorageFor(); - var loader = provider.BulkLoader; + var loader = provider.BulkLoader!; if (mode != BulkInsertMode.InsertsOnly) { @@ -325,11 +330,28 @@ private async Task bulkInsertDocumentsAsync(IReadOnlyCollection documents, } else if (mode == BulkInsertMode.OverwriteExisting) { - var overwrite = loader.OverwriteDuplicatesFromTempTable(); - var copy = loader.CopyNewDocumentsFromTempTable(); + var upsert = string.Format(loader.UpsertFromTempTable(), updateCondition ?? "true"); + + + await conn.CreateCommand(upsert).ExecuteNonQueryAsync(cancellation).ConfigureAwait(false); + } + } - await conn.CreateCommand(overwrite + ";" + copy) - .ExecuteNonQueryAsync(cancellation).ConfigureAwait(false); + private static void ValidateupdateCondition(BulkInsertMode mode, string updateCondition) + { + if (updateCondition is null) + { + return; + } + + if (typeof(T) == typeof(object)) + { + throw new ArgumentException($"An update condition can not be used on a collection of , use a collection of a specific document type instead.", nameof(updateCondition)); + } + + if (mode != BulkInsertMode.OverwriteExisting) + { + throw new ArgumentException($"An update condition can only be provided when using {BulkInsertMode.OverwriteExisting}", nameof(updateCondition)); } } @@ -381,7 +403,7 @@ public void BulkInsert(int batchSize, NpgsqlConnection connection, BulkInsertion BulkInsertMode mode) { parent._tenant.Database.EnsureStorageExists(typeof(T)); - parent.bulkInsertDocuments(_documents, batchSize, connection, mode); + parent.bulkInsertDocuments(_documents, batchSize, connection, mode, null); } public async Task BulkInsertAsync(int batchSize, NpgsqlConnection conn, BulkInsertion parent, @@ -389,7 +411,7 @@ public async Task BulkInsertAsync(int batchSize, NpgsqlConnection conn, BulkInse CancellationToken cancellation) { await parent._tenant.Database.EnsureStorageExistsAsync(typeof(T), cancellation).ConfigureAwait(false); - await parent.bulkInsertDocumentsAsync(_documents, batchSize, conn, mode, cancellation) + await parent.bulkInsertDocumentsAsync(_documents, batchSize, conn, mode, null, cancellation) .ConfigureAwait(false); } }