From e253986974498d4854d53d42c15c891e2afa2dc8 Mon Sep 17 00:00:00 2001 From: Ben Edwards Date: Tue, 9 Jul 2024 22:27:45 +1000 Subject: [PATCH 1/8] Replace copy+overwrite with upsert operation when bulk loading. --- .../Internal/CodeGeneration/BulkLoader.cs | 2 +- .../CodeGeneration/BulkLoaderBuilder.cs | 36 +++++++++++-------- .../CodeGeneration/SubClassBulkLoader.cs | 4 +-- src/Marten/Schema/BulkLoading/IBulkLoader.cs | 2 +- src/Marten/Storage/BulkInsertion.cs | 15 ++++---- 5 files changed, 32 insertions(+), 27 deletions(-) diff --git a/src/Marten/Internal/CodeGeneration/BulkLoader.cs b/src/Marten/Internal/CodeGeneration/BulkLoader.cs index ce5b12b752..8b3e1a2f1a 100644 --- a/src/Marten/Internal/CodeGeneration/BulkLoader.cs +++ b/src/Marten/Internal/CodeGeneration/BulkLoader.cs @@ -81,7 +81,7 @@ public async Task LoadIntoTempTableAsync(Tenant tenant, ISerializer serializer, public abstract string CopyNewDocumentsFromTempTable(); - public abstract string OverwriteDuplicatesFromTempTable(); + public abstract string UpsertFromTempTable(); public object GetNullable(TValue? value) where TValue : struct { diff --git a/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs b/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs index 08ecd70f00..d5d31cb4e9 100644 --- a/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs +++ b/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs @@ -52,8 +52,8 @@ public GeneratedType BuildType(GeneratedAssembly assembly) type.MethodFor(nameof(CopyNewDocumentsFromTempTable)) .Frames.ReturnNewStringConstant("COPY_NEW_DOCUMENTS_SQL", CopyNewDocumentsFromTempTable()); - type.MethodFor(nameof(OverwriteDuplicatesFromTempTable)) - .Frames.ReturnNewStringConstant("OVERWRITE_SQL", OverwriteDuplicatesFromTempTable()); + type.MethodFor(nameof(UpsertFromTempTable)) + .Frames.ReturnNewStringConstant("UPSERT_SQL", UpsertFromTempTable()); type.MethodFor(nameof(CreateTempTableForCopying)) .Frames.ReturnNewStringConstant("CREATE_TEMP_TABLE_FOR_COPYING_SQL", @@ -92,32 +92,40 @@ public string CopyNewDocumentsFromTempTable() var selectColumns = table.Columns.Where(x => x.Name != SchemaConstants.LastModifiedColumn) .Select(x => $"{_tempTable}.\\\"{x.Name}\\\"").Join(", "); - var joinExpression = isMultiTenanted - ? $"{_tempTable}.id = {storageTable}.id and {_tempTable}.tenant_id = {storageTable}.tenant_id" - : $"{_tempTable}.id = {storageTable}.id"; + var conflictColumns = isMultiTenanted + ? "tenant_id, id" + : "id"; return $"insert into {storageTable} ({columns}, {SchemaConstants.LastModifiedColumn}) " + - $"(select {selectColumns}, transaction_timestamp() " + - $"from {_tempTable} left join {storageTable} on {joinExpression} " + - $"where {storageTable}.id is null)"; + $"select {selectColumns}, transaction_timestamp() " + + $"from {_tempTable} " + + $"on conflict ({conflictColumns}) do nothing"; } - public string OverwriteDuplicatesFromTempTable() + public string UpsertFromTempTable() { var table = _mapping.Schema.Table; var isMultiTenanted = _mapping.TenancyStyle == TenancyStyle.Conjoined; var storageTable = table.Identifier.QualifiedName; + var columns = table.Columns.Where(x => x.Name != SchemaConstants.LastModifiedColumn) + .Select(x => $"\\\"{x.Name}\\\"").Join(", "); + var selectColumns = table.Columns.Where(x => x.Name != SchemaConstants.LastModifiedColumn) + .Select(x => $"{_tempTable}.\\\"{x.Name}\\\"").Join(", "); var updates = table.Columns.Where(x => x.Name != "id" && x.Name != SchemaConstants.LastModifiedColumn) - .Select(x => $"{x.Name} = source.{x.Name}").Join(", "); + .Select(x => $"{x.Name} = excluded.{x.Name}").Join(", "); - var joinExpression = isMultiTenanted - ? "source.id = target.id and source.tenant_id = target.tenant_id" - : "source.id = target.id"; + var conflictColumns = isMultiTenanted + ? "tenant_id, id" + : "id"; return - $"update {storageTable} target SET {updates}, {SchemaConstants.LastModifiedColumn} = transaction_timestamp() FROM {_tempTable} source WHERE {joinExpression}"; + $"insert into {storageTable} ({columns}, {SchemaConstants.LastModifiedColumn}) " + + $"select {selectColumns}, transaction_timestamp() " + + $"from {_tempTable} " + + $"on conflict ({conflictColumns}) do update " + + $"set {updates}, {SchemaConstants.LastModifiedColumn} = excluded.{SchemaConstants.LastModifiedColumn}"; } public string CreateTempTableForCopying() diff --git a/src/Marten/Internal/CodeGeneration/SubClassBulkLoader.cs b/src/Marten/Internal/CodeGeneration/SubClassBulkLoader.cs index 917907a33e..3c1b3ef942 100644 --- a/src/Marten/Internal/CodeGeneration/SubClassBulkLoader.cs +++ b/src/Marten/Internal/CodeGeneration/SubClassBulkLoader.cs @@ -51,8 +51,8 @@ public string CopyNewDocumentsFromTempTable() return _inner.CopyNewDocumentsFromTempTable(); } - public string OverwriteDuplicatesFromTempTable() + public string UpsertFromTempTable() { - return _inner.OverwriteDuplicatesFromTempTable(); + return _inner.UpsertFromTempTable(); } } diff --git a/src/Marten/Schema/BulkLoading/IBulkLoader.cs b/src/Marten/Schema/BulkLoading/IBulkLoader.cs index 564fdd01ec..59f49562fb 100644 --- a/src/Marten/Schema/BulkLoading/IBulkLoader.cs +++ b/src/Marten/Schema/BulkLoading/IBulkLoader.cs @@ -26,5 +26,5 @@ Task LoadIntoTempTableAsync(Tenant tenant, ISerializer serializer, NpgsqlConnect string CopyNewDocumentsFromTempTable(); - string OverwriteDuplicatesFromTempTable(); + string UpsertFromTempTable(); } diff --git a/src/Marten/Storage/BulkInsertion.cs b/src/Marten/Storage/BulkInsertion.cs index ff90c9cec2..16bac53981 100644 --- a/src/Marten/Storage/BulkInsertion.cs +++ b/src/Marten/Storage/BulkInsertion.cs @@ -234,7 +234,7 @@ private void bulkInsertDocuments(IReadOnlyCollection documents, int batchS BulkInsertMode mode) { var provider = _tenant.Database.Providers.StorageFor(); - var loader = provider.BulkLoader; + var loader = provider.BulkLoader!; if (mode != BulkInsertMode.InsertsOnly) { @@ -274,10 +274,9 @@ private void bulkInsertDocuments(IReadOnlyCollection documents, int batchS } else if (mode == BulkInsertMode.OverwriteExisting) { - var overwrite = loader.OverwriteDuplicatesFromTempTable(); - var copy = loader.CopyNewDocumentsFromTempTable(); + var upsert = loader.UpsertFromTempTable(); - conn.CreateCommand(overwrite + ";" + copy).ExecuteNonQuery(); + conn.CreateCommand(upsert).ExecuteNonQuery(); } } @@ -285,7 +284,7 @@ private async Task bulkInsertDocumentsAsync(IReadOnlyCollection documents, NpgsqlConnection conn, BulkInsertMode mode, CancellationToken cancellation) { var provider = _tenant.Database.Providers.StorageFor(); - var loader = provider.BulkLoader; + var loader = provider.BulkLoader!; if (mode != BulkInsertMode.InsertsOnly) { @@ -325,11 +324,9 @@ private async Task bulkInsertDocumentsAsync(IReadOnlyCollection documents, } else if (mode == BulkInsertMode.OverwriteExisting) { - var overwrite = loader.OverwriteDuplicatesFromTempTable(); - var copy = loader.CopyNewDocumentsFromTempTable(); + var upsert = loader.UpsertFromTempTable(); - await conn.CreateCommand(overwrite + ";" + copy) - .ExecuteNonQueryAsync(cancellation).ConfigureAwait(false); + await conn.CreateCommand(upsert).ExecuteNonQueryAsync(cancellation).ConfigureAwait(false); } } From 95f2b386ebb4211c1c05999d8ed23a7b1e597563 Mon Sep 17 00:00:00 2001 From: Ben Edwards Date: Fri, 27 Sep 2024 16:36:29 +1000 Subject: [PATCH 2/8] String based filter --- .../CodeGeneration/BulkLoaderBuilder.cs | 3 +- src/Marten/Storage/BulkInsertion.cs | 68 ++++++++++++------- 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs b/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs index d5d31cb4e9..ebc41adcca 100644 --- a/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs +++ b/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs @@ -125,7 +125,8 @@ public string UpsertFromTempTable() $"select {selectColumns}, transaction_timestamp() " + $"from {_tempTable} " + $"on conflict ({conflictColumns}) do update " + - $"set {updates}, {SchemaConstants.LastModifiedColumn} = excluded.{SchemaConstants.LastModifiedColumn}"; + $"set {updates}, {SchemaConstants.LastModifiedColumn} = excluded.{SchemaConstants.LastModifiedColumn} " + + "where ( {0} )"; } public string CreateTempTableForCopying() diff --git a/src/Marten/Storage/BulkInsertion.cs b/src/Marten/Storage/BulkInsertion.cs index 16bac53981..62db735326 100644 --- a/src/Marten/Storage/BulkInsertion.cs +++ b/src/Marten/Storage/BulkInsertion.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; using System.Transactions; @@ -28,7 +29,7 @@ public void Dispose() } public void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, string upsertCondition = null) { if (typeof(T) == typeof(object)) { @@ -44,7 +45,7 @@ public void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode try { - bulkInsertDocuments(documents, batchSize, conn, mode); + bulkInsertDocuments(documents, batchSize, conn, mode, upsertCondition); tx.Commit(); } @@ -59,7 +60,8 @@ public void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode public void BulkInsertEnlistTransaction(IReadOnlyCollection documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, + string upsertCondition = null) { if (typeof(T) == typeof(object)) { @@ -72,16 +74,16 @@ public void BulkInsertEnlistTransaction(IReadOnlyCollection documents, using var conn = _tenant.Database.CreateConnection(); conn.Open(); conn.EnlistTransaction(transaction); - bulkInsertDocuments(documents, batchSize, conn, mode); + bulkInsertDocuments(documents, batchSize, conn, mode, upsertCondition); } } public async Task BulkInsertAsync(IReadOnlyCollection documents, BulkInsertMode mode, int batchSize, - CancellationToken cancellation) + string upsertCondition = null, CancellationToken cancellation = default) { if (typeof(T) == typeof(object)) { - await BulkInsertDocumentsAsync(documents.OfType(), mode, batchSize, cancellation) + await BulkInsertDocumentsAsync(documents.OfType(), mode, batchSize, upsertCondition, cancellation) .ConfigureAwait(false); } else @@ -94,7 +96,7 @@ await BulkInsertDocumentsAsync(documents.OfType(), mode, batchSize, canc var tx = await conn.BeginTransactionAsync(cancellation).ConfigureAwait(false); try { - await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, cancellation).ConfigureAwait(false); + await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, upsertCondition, cancellation).ConfigureAwait(false); await tx.CommitAsync(cancellation).ConfigureAwait(false); } @@ -107,12 +109,12 @@ await BulkInsertDocumentsAsync(documents.OfType(), mode, batchSize, canc } public async Task BulkInsertEnlistTransactionAsync(IReadOnlyCollection documents, Transaction transaction, - BulkInsertMode mode, int batchSize, CancellationToken cancellation) + BulkInsertMode mode, int batchSize, string upsertCondition = null, CancellationToken cancellation = default) { if (typeof(T) == typeof(object)) { await BulkInsertDocumentsEnlistTransactionAsync(documents.OfType(), transaction, mode, batchSize, - cancellation).ConfigureAwait(false); + upsertCondition, cancellation).ConfigureAwait(false); } else { @@ -120,12 +122,12 @@ await BulkInsertDocumentsEnlistTransactionAsync(documents.OfType(), tran await using var conn = _tenant.Database.CreateConnection(); await conn.OpenAsync(cancellation).ConfigureAwait(false); conn.EnlistTransaction(transaction); - await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, cancellation).ConfigureAwait(false); + await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, upsertCondition, cancellation).ConfigureAwait(false); } } public void BulkInsertDocuments(IEnumerable documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, string upsertCondition = null) { var groups = bulkInserters(documents); @@ -150,7 +152,8 @@ public void BulkInsertDocuments(IEnumerable documents, BulkInsertMode mo public void BulkInsertDocumentsEnlistTransaction(IEnumerable documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, + string upsertCondition = null) { var groups = bulkInserters(documents); var types = documentTypes(documents); @@ -184,7 +187,7 @@ private static IBulkInserter[] bulkInserters(IEnumerable documents) } public async Task BulkInsertDocumentsAsync(IEnumerable documents, BulkInsertMode mode, int batchSize, - CancellationToken cancellation) + string upsertCondition = null, CancellationToken cancellation = default) { var groups = bulkInserters(documents); @@ -196,7 +199,7 @@ public async Task BulkInsertDocumentsAsync(IEnumerable documents, BulkIn try { foreach (var group in groups) - await group.BulkInsertAsync(batchSize, conn, this, mode, cancellation).ConfigureAwait(false); + await group.BulkInsertAsync(batchSize, conn, this, mode, upsertCondition, cancellation).ConfigureAwait(false); await tx.CommitAsync(cancellation).ConfigureAwait(false); } @@ -212,7 +215,8 @@ public async Task BulkInsertDocumentsEnlistTransactionAsync( Transaction transaction, BulkInsertMode mode, int batchSize, - CancellationToken cancellation + string upsertCondition = null, + CancellationToken cancellation = default ) { var groups = bulkInserters(documents); @@ -227,12 +231,14 @@ CancellationToken cancellation conn.EnlistTransaction(transaction); foreach (var group in groups) - await group.BulkInsertAsync(batchSize, conn, this, mode, cancellation).ConfigureAwait(false); + await group.BulkInsertAsync(batchSize, conn, this, mode, upsertCondition, cancellation).ConfigureAwait(false); } private void bulkInsertDocuments(IReadOnlyCollection documents, int batchSize, NpgsqlConnection conn, - BulkInsertMode mode) + BulkInsertMode mode, string upsertCondition) { + ValidateUpsertCondition(mode, upsertCondition); + var provider = _tenant.Database.Providers.StorageFor(); var loader = provider.BulkLoader!; @@ -274,15 +280,17 @@ private void bulkInsertDocuments(IReadOnlyCollection documents, int batchS } else if (mode == BulkInsertMode.OverwriteExisting) { - var upsert = loader.UpsertFromTempTable(); + var upsert = string.Format(loader.UpsertFromTempTable(), upsertCondition ?? "1 = 1"); conn.CreateCommand(upsert).ExecuteNonQuery(); } } private async Task bulkInsertDocumentsAsync(IReadOnlyCollection documents, int batchSize, - NpgsqlConnection conn, BulkInsertMode mode, CancellationToken cancellation) + NpgsqlConnection conn, BulkInsertMode mode, string upsertCondition, CancellationToken cancellation) { + ValidateUpsertCondition(mode, upsertCondition); + var provider = _tenant.Database.Providers.StorageFor(); var loader = provider.BulkLoader!; @@ -324,12 +332,21 @@ private async Task bulkInsertDocumentsAsync(IReadOnlyCollection documents, } else if (mode == BulkInsertMode.OverwriteExisting) { - var upsert = loader.UpsertFromTempTable(); + var upsert = string.Format(loader.UpsertFromTempTable(), upsertCondition ?? "1 = 1"); + await conn.CreateCommand(upsert).ExecuteNonQueryAsync(cancellation).ConfigureAwait(false); } } + private static void ValidateUpsertCondition(BulkInsertMode mode, string upsertCondition) + { + if (mode != BulkInsertMode.OverwriteExisting && !string.IsNullOrWhiteSpace(upsertCondition)) + { + throw new ArgumentException($"An upsert condition can only be provided when using {BulkInsertMode.OverwriteExisting}", nameof(upsertCondition)); + } + } + private void loadDocuments(IEnumerable documents, IBulkLoader loader, BulkInsertMode mode, NpgsqlConnection conn) { @@ -359,10 +376,10 @@ await loader.LoadIntoTempTableAsync(_tenant, Serializer, conn, documents, cancel internal interface IBulkInserter { - void BulkInsert(int batchSize, NpgsqlConnection connection, BulkInsertion parent, BulkInsertMode mode); + void BulkInsert(int batchSize, NpgsqlConnection connection, BulkInsertion parent, BulkInsertMode mode, string upsertCondition = null); Task BulkInsertAsync(int batchSize, NpgsqlConnection conn, BulkInsertion bulkInsertion, BulkInsertMode mode, - CancellationToken cancellation); + string upsertCondition, CancellationToken cancellation); } internal class BulkInserter: IBulkInserter @@ -375,18 +392,19 @@ public BulkInserter(IEnumerable documents) } public void BulkInsert(int batchSize, NpgsqlConnection connection, BulkInsertion parent, - BulkInsertMode mode) + BulkInsertMode mode, string upsertCondition = null) { parent._tenant.Database.EnsureStorageExists(typeof(T)); - parent.bulkInsertDocuments(_documents, batchSize, connection, mode); + parent.bulkInsertDocuments(_documents, batchSize, connection, mode, upsertCondition); } public async Task BulkInsertAsync(int batchSize, NpgsqlConnection conn, BulkInsertion parent, BulkInsertMode mode, + string upsertCondition, CancellationToken cancellation) { await parent._tenant.Database.EnsureStorageExistsAsync(typeof(T), cancellation).ConfigureAwait(false); - await parent.bulkInsertDocumentsAsync(_documents, batchSize, conn, mode, cancellation) + await parent.bulkInsertDocumentsAsync(_documents, batchSize, conn, mode, upsertCondition, cancellation) .ConfigureAwait(false); } } From 12a49c072e4f01f7edf374bcf26d644469b574ee Mon Sep 17 00:00:00 2001 From: Ben Edwards Date: Fri, 27 Sep 2024 16:36:44 +1000 Subject: [PATCH 3/8] Update BulkLoaderBuilder.cs --- src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs b/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs index ebc41adcca..9836b3cb3b 100644 --- a/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs +++ b/src/Marten/Internal/CodeGeneration/BulkLoaderBuilder.cs @@ -121,7 +121,7 @@ public string UpsertFromTempTable() : "id"; return - $"insert into {storageTable} ({columns}, {SchemaConstants.LastModifiedColumn}) " + + $"insert into {storageTable} as d ({columns}, {SchemaConstants.LastModifiedColumn}) " + $"select {selectColumns}, transaction_timestamp() " + $"from {_tempTable} " + $"on conflict ({conflictColumns}) do update " + From c9371f5179e6050a8dbaa1ced5134d713d005b2b Mon Sep 17 00:00:00 2001 From: Ben Edwards Date: Thu, 3 Oct 2024 08:20:15 +1000 Subject: [PATCH 4/8] upsert cleanup --- src/Marten/DocumentStore.cs | 35 +++++++------ src/Marten/IDocumentStore.cs | 12 ++--- src/Marten/Storage/BulkInsertion.cs | 77 ++++++++++++++++------------- 3 files changed, 69 insertions(+), 55 deletions(-) diff --git a/src/Marten/DocumentStore.cs b/src/Marten/DocumentStore.cs index 54a673b479..6c0f1252c4 100644 --- a/src/Marten/DocumentStore.cs +++ b/src/Marten/DocumentStore.cs @@ -17,10 +17,8 @@ using Marten.Internal.Sessions; using Marten.Services; using Marten.Storage; -using Microsoft.CodeAnalysis.Options; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; -using Weasel.Core.Migrations; using Weasel.Postgresql.Connections; using IsolationLevel = System.Data.IsolationLevel; @@ -115,7 +113,7 @@ public ValueTask DisposeAsync() public AdvancedOperations Advanced { get; } public void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, string? updateCondition = null) { var bulkInsertion = new BulkInsertion(Tenancy.Default, Options); bulkInsertion.BulkInsert(documents, mode, batchSize); @@ -123,10 +121,10 @@ public void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode public void BulkInsertEnlistTransaction(IReadOnlyCollection documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, string? updateCondition = null) { var bulkInsertion = new BulkInsertion(Tenancy.Default, Options); - bulkInsertion.BulkInsertEnlistTransaction(documents, transaction, mode, batchSize); + bulkInsertion.BulkInsertEnlistTransaction(documents, transaction, mode, batchSize, updateCondition); } public void BulkInsertDocuments(IEnumerable documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, @@ -138,10 +136,11 @@ public void BulkInsertDocuments(IEnumerable documents, BulkInsertMode mo public void BulkInsert(string tenantId, IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000) + int batchSize = 1000, + string? updateCondition = null) { var bulkInsertion = new BulkInsertion(Tenancy.GetTenant(Options.MaybeCorrectTenantId(tenantId)), Options); - bulkInsertion.BulkInsert(documents, mode, batchSize); + bulkInsertion.BulkInsert(documents, mode, batchSize, updateCondition); } public void BulkInsertDocuments(string tenantId, IEnumerable documents, @@ -154,27 +153,35 @@ public void BulkInsertDocuments(string tenantId, IEnumerable documents, public Task BulkInsertAsync(IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000, CancellationToken cancellation = default) + int batchSize = 1000, + string? updateCondition = null, + CancellationToken cancellation = default) { var bulkInsertion = new BulkInsertion(Tenancy.Default, Options); - return bulkInsertion.BulkInsertAsync(documents, mode, batchSize, cancellation); + return bulkInsertion.BulkInsertAsync(documents, mode, batchSize, updateCondition, cancellation); } public Task BulkInsertEnlistTransactionAsync(IReadOnlyCollection documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000, CancellationToken cancellation = default) + int batchSize = 1000, + string? updateCondition = null, + CancellationToken cancellation = default) { var bulkInsertion = new BulkInsertion(Tenancy.Default, Options); - return bulkInsertion.BulkInsertEnlistTransactionAsync(documents, transaction, mode, batchSize, cancellation); + return bulkInsertion.BulkInsertEnlistTransactionAsync(documents, transaction, mode, batchSize, updateCondition, cancellation); } - public async Task BulkInsertAsync(string tenantId, IReadOnlyCollection documents, - BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, + public async Task BulkInsertAsync( + string tenantId, + IReadOnlyCollection documents, + BulkInsertMode mode = BulkInsertMode.InsertsOnly, + int batchSize = 1000, + string? updateCondition = null, CancellationToken cancellation = default) { var bulkInsertion = new BulkInsertion(await Tenancy.GetTenantAsync(Options.MaybeCorrectTenantId(tenantId)).ConfigureAwait(false), Options); - await bulkInsertion.BulkInsertAsync(documents, mode, batchSize, cancellation).ConfigureAwait(false); + await bulkInsertion.BulkInsertAsync(documents, mode, batchSize, updateCondition, cancellation).ConfigureAwait(false); } public Task BulkInsertDocumentsAsync(IEnumerable documents, diff --git a/src/Marten/IDocumentStore.cs b/src/Marten/IDocumentStore.cs index 488660a664..bbefff736c 100644 --- a/src/Marten/IDocumentStore.cs +++ b/src/Marten/IDocumentStore.cs @@ -46,7 +46,7 @@ public interface IDocumentStore: IDisposable /// /// void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000); + int batchSize = 1000, string? updateCondition = null); /// /// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store @@ -58,7 +58,7 @@ void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode = BulkI /// /// void BulkInsertEnlistTransaction(IReadOnlyCollection documents, Transaction transaction, - BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000); + BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, string? updateCondition = null); /// /// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store @@ -70,7 +70,7 @@ void BulkInsertEnlistTransaction(IReadOnlyCollection documents, Transactio /// /// void BulkInsert(string tenantId, IReadOnlyCollection documents, - BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000); + BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, string? updateCondition = null); /// /// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store @@ -81,7 +81,7 @@ void BulkInsert(string tenantId, IReadOnlyCollection documents, /// /// Task BulkInsertAsync(IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000, CancellationToken cancellation = default); + int batchSize = 1000, string? updateCondition = null, CancellationToken cancellation = default); /// /// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store @@ -94,7 +94,7 @@ Task BulkInsertAsync(IReadOnlyCollection documents, BulkInsertMode mode = /// Task BulkInsertEnlistTransactionAsync(IReadOnlyCollection documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, - CancellationToken cancellation = default); + string? updateCondition = null, CancellationToken cancellation = default); /// /// Uses Postgresql's COPY ... FROM STDIN BINARY feature to efficiently store @@ -107,7 +107,7 @@ Task BulkInsertEnlistTransactionAsync(IReadOnlyCollection documents, Trans /// Task BulkInsertAsync(string tenantId, IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, - CancellationToken cancellation = default); + string? updateCondition = null, CancellationToken cancellation = default); /// /// Open a new IDocumentSession with the supplied DocumentTracking. diff --git a/src/Marten/Storage/BulkInsertion.cs b/src/Marten/Storage/BulkInsertion.cs index 62db735326..3c81c3439c 100644 --- a/src/Marten/Storage/BulkInsertion.cs +++ b/src/Marten/Storage/BulkInsertion.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Net.Http.Headers; using System.Threading; using System.Threading.Tasks; using System.Transactions; @@ -29,8 +28,9 @@ public void Dispose() } public void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000, string upsertCondition = null) + int batchSize = 1000, string updateCondition = null) { + ValidateupdateCondition(mode, updateCondition); if (typeof(T) == typeof(object)) { BulkInsertDocuments(documents.OfType(), mode); @@ -45,7 +45,7 @@ public void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode try { - bulkInsertDocuments(documents, batchSize, conn, mode, upsertCondition); + bulkInsertDocuments(documents, batchSize, conn, mode, updateCondition); tx.Commit(); } @@ -61,8 +61,9 @@ public void BulkInsertEnlistTransaction(IReadOnlyCollection documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, - string upsertCondition = null) + string updateCondition = null) { + ValidateupdateCondition(mode, updateCondition); if (typeof(T) == typeof(object)) { BulkInsertDocumentsEnlistTransaction(documents.OfType(), transaction, mode); @@ -74,16 +75,17 @@ public void BulkInsertEnlistTransaction(IReadOnlyCollection documents, using var conn = _tenant.Database.CreateConnection(); conn.Open(); conn.EnlistTransaction(transaction); - bulkInsertDocuments(documents, batchSize, conn, mode, upsertCondition); + bulkInsertDocuments(documents, batchSize, conn, mode, updateCondition); } } public async Task BulkInsertAsync(IReadOnlyCollection documents, BulkInsertMode mode, int batchSize, - string upsertCondition = null, CancellationToken cancellation = default) + string updateCondition = null, CancellationToken cancellation = default) { + ValidateupdateCondition(mode, updateCondition); if (typeof(T) == typeof(object)) { - await BulkInsertDocumentsAsync(documents.OfType(), mode, batchSize, upsertCondition, cancellation) + await BulkInsertDocumentsAsync(documents.OfType(), mode, batchSize, cancellation) .ConfigureAwait(false); } else @@ -96,7 +98,7 @@ await BulkInsertDocumentsAsync(documents.OfType(), mode, batchSize, upse var tx = await conn.BeginTransactionAsync(cancellation).ConfigureAwait(false); try { - await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, upsertCondition, cancellation).ConfigureAwait(false); + await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, updateCondition, cancellation).ConfigureAwait(false); await tx.CommitAsync(cancellation).ConfigureAwait(false); } @@ -109,12 +111,13 @@ await BulkInsertDocumentsAsync(documents.OfType(), mode, batchSize, upse } public async Task BulkInsertEnlistTransactionAsync(IReadOnlyCollection documents, Transaction transaction, - BulkInsertMode mode, int batchSize, string upsertCondition = null, CancellationToken cancellation = default) + BulkInsertMode mode, int batchSize, string updateCondition = null, CancellationToken cancellation = default) { + ValidateupdateCondition(mode, updateCondition); if (typeof(T) == typeof(object)) { await BulkInsertDocumentsEnlistTransactionAsync(documents.OfType(), transaction, mode, batchSize, - upsertCondition, cancellation).ConfigureAwait(false); + cancellation).ConfigureAwait(false); } else { @@ -122,12 +125,12 @@ await BulkInsertDocumentsEnlistTransactionAsync(documents.OfType(), tran await using var conn = _tenant.Database.CreateConnection(); await conn.OpenAsync(cancellation).ConfigureAwait(false); conn.EnlistTransaction(transaction); - await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, upsertCondition, cancellation).ConfigureAwait(false); + await bulkInsertDocumentsAsync(documents, batchSize, conn, mode, updateCondition, cancellation).ConfigureAwait(false); } } public void BulkInsertDocuments(IEnumerable documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, - int batchSize = 1000, string upsertCondition = null) + int batchSize = 1000, string updateCondition = null) { var groups = bulkInserters(documents); @@ -153,7 +156,7 @@ public void BulkInsertDocumentsEnlistTransaction(IEnumerable documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, - string upsertCondition = null) + string updateCondition = null) { var groups = bulkInserters(documents); var types = documentTypes(documents); @@ -187,7 +190,7 @@ private static IBulkInserter[] bulkInserters(IEnumerable documents) } public async Task BulkInsertDocumentsAsync(IEnumerable documents, BulkInsertMode mode, int batchSize, - string upsertCondition = null, CancellationToken cancellation = default) + CancellationToken cancellation = default) { var groups = bulkInserters(documents); @@ -199,7 +202,7 @@ public async Task BulkInsertDocumentsAsync(IEnumerable documents, BulkIn try { foreach (var group in groups) - await group.BulkInsertAsync(batchSize, conn, this, mode, upsertCondition, cancellation).ConfigureAwait(false); + await group.BulkInsertAsync(batchSize, conn, this, mode, cancellation).ConfigureAwait(false); await tx.CommitAsync(cancellation).ConfigureAwait(false); } @@ -215,8 +218,7 @@ public async Task BulkInsertDocumentsEnlistTransactionAsync( Transaction transaction, BulkInsertMode mode, int batchSize, - string upsertCondition = null, - CancellationToken cancellation = default + CancellationToken cancellation ) { var groups = bulkInserters(documents); @@ -231,14 +233,12 @@ public async Task BulkInsertDocumentsEnlistTransactionAsync( conn.EnlistTransaction(transaction); foreach (var group in groups) - await group.BulkInsertAsync(batchSize, conn, this, mode, upsertCondition, cancellation).ConfigureAwait(false); + await group.BulkInsertAsync(batchSize, conn, this, mode, cancellation).ConfigureAwait(false); } private void bulkInsertDocuments(IReadOnlyCollection documents, int batchSize, NpgsqlConnection conn, - BulkInsertMode mode, string upsertCondition) + BulkInsertMode mode, string updateCondition) { - ValidateUpsertCondition(mode, upsertCondition); - var provider = _tenant.Database.Providers.StorageFor(); var loader = provider.BulkLoader!; @@ -280,17 +280,15 @@ private void bulkInsertDocuments(IReadOnlyCollection documents, int batchS } else if (mode == BulkInsertMode.OverwriteExisting) { - var upsert = string.Format(loader.UpsertFromTempTable(), upsertCondition ?? "1 = 1"); + var upsert = string.Format(loader.UpsertFromTempTable(), updateCondition ?? "1 = 1"); conn.CreateCommand(upsert).ExecuteNonQuery(); } } private async Task bulkInsertDocumentsAsync(IReadOnlyCollection documents, int batchSize, - NpgsqlConnection conn, BulkInsertMode mode, string upsertCondition, CancellationToken cancellation) + NpgsqlConnection conn, BulkInsertMode mode, string updateCondition, CancellationToken cancellation) { - ValidateUpsertCondition(mode, upsertCondition); - var provider = _tenant.Database.Providers.StorageFor(); var loader = provider.BulkLoader!; @@ -332,18 +330,28 @@ private async Task bulkInsertDocumentsAsync(IReadOnlyCollection documents, } else if (mode == BulkInsertMode.OverwriteExisting) { - var upsert = string.Format(loader.UpsertFromTempTable(), upsertCondition ?? "1 = 1"); + var upsert = string.Format(loader.UpsertFromTempTable(), updateCondition ?? "1 = 1"); await conn.CreateCommand(upsert).ExecuteNonQueryAsync(cancellation).ConfigureAwait(false); } } - private static void ValidateUpsertCondition(BulkInsertMode mode, string upsertCondition) + private static void ValidateupdateCondition(BulkInsertMode mode, string updateCondition) { - if (mode != BulkInsertMode.OverwriteExisting && !string.IsNullOrWhiteSpace(upsertCondition)) + if (updateCondition is null) + { + return; + } + + if (typeof(T) == typeof(object)) + { + throw new ArgumentException($"An update condition can not be used on a collection of , use a collection of a specific document type instead.", nameof(updateCondition)); + } + + if (mode != BulkInsertMode.OverwriteExisting) { - throw new ArgumentException($"An upsert condition can only be provided when using {BulkInsertMode.OverwriteExisting}", nameof(upsertCondition)); + throw new ArgumentException($"An update condition can only be provided when using {BulkInsertMode.OverwriteExisting}", nameof(updateCondition)); } } @@ -376,10 +384,10 @@ await loader.LoadIntoTempTableAsync(_tenant, Serializer, conn, documents, cancel internal interface IBulkInserter { - void BulkInsert(int batchSize, NpgsqlConnection connection, BulkInsertion parent, BulkInsertMode mode, string upsertCondition = null); + void BulkInsert(int batchSize, NpgsqlConnection connection, BulkInsertion parent, BulkInsertMode mode); Task BulkInsertAsync(int batchSize, NpgsqlConnection conn, BulkInsertion bulkInsertion, BulkInsertMode mode, - string upsertCondition, CancellationToken cancellation); + CancellationToken cancellation); } internal class BulkInserter: IBulkInserter @@ -392,19 +400,18 @@ public BulkInserter(IEnumerable documents) } public void BulkInsert(int batchSize, NpgsqlConnection connection, BulkInsertion parent, - BulkInsertMode mode, string upsertCondition = null) + BulkInsertMode mode) { parent._tenant.Database.EnsureStorageExists(typeof(T)); - parent.bulkInsertDocuments(_documents, batchSize, connection, mode, upsertCondition); + parent.bulkInsertDocuments(_documents, batchSize, connection, mode, null); } public async Task BulkInsertAsync(int batchSize, NpgsqlConnection conn, BulkInsertion parent, BulkInsertMode mode, - string upsertCondition, CancellationToken cancellation) { await parent._tenant.Database.EnsureStorageExistsAsync(typeof(T), cancellation).ConfigureAwait(false); - await parent.bulkInsertDocumentsAsync(_documents, batchSize, conn, mode, upsertCondition, cancellation) + await parent.bulkInsertDocumentsAsync(_documents, batchSize, conn, mode, null, cancellation) .ConfigureAwait(false); } } From b975516dbc6b700b5ad4c7aa68214a8ff4a4e232 Mon Sep 17 00:00:00 2001 From: Ben Edwards Date: Thu, 3 Oct 2024 10:45:50 +1000 Subject: [PATCH 5/8] Tests and xmldoc --- src/DocumentDbTests/Writing/bulk_loading.cs | 46 ++++++++++++++ src/Marten.Testing/Documents/Target.cs | 2 +- src/Marten/DocumentStore.cs | 2 +- src/Marten/IDocumentStore.cs | 66 +++++++++++++++++++++ src/Marten/Storage/BulkInsertion.cs | 4 +- 5 files changed, 116 insertions(+), 4 deletions(-) diff --git a/src/DocumentDbTests/Writing/bulk_loading.cs b/src/DocumentDbTests/Writing/bulk_loading.cs index 794ea21e63..7e84df39ce 100644 --- a/src/DocumentDbTests/Writing/bulk_loading.cs +++ b/src/DocumentDbTests/Writing/bulk_loading.cs @@ -86,6 +86,50 @@ public void load_with_overwrite_duplicates() count.ShouldBe(0); } + [Fact] + public void load_with_overwrite_duplicates_and_update_condition() + { + var data1 = Target.GenerateRandomData(30) + .Select(t => t with { Number = 100, String = "initial insert" }) + .ToArray(); + theStore.BulkInsert(data1); + + var data2 = data1.Take(20) + .Select((t, i) => t with { Number = i < 10 ? 50 : 150, String = "second insert" }) + .ToArray(); + + theStore.BulkInsert(data2, BulkInsertMode.OverwriteExisting, updateCondition: "(d.data ->> 'Number')::int <= (excluded.data ->> 'Number')::int"); + + using var session = theStore.QuerySession(); + session.Query().Count().ShouldBe(data1.Length); + + // Values were overwritten + session.Query().Count(t => t.Number == 100 && t.String == "initial insert").ShouldBe(20); + session.Query().Count(t => t.Number == 150 && t.String == "second insert").ShouldBe(10); + } + + [Fact] + public async Task load_with_overwrite_duplicates_and_update_condition_async() + { + var data1 = Target.GenerateRandomData(30) + .Select(t => t with { Number = 100, String = "initial insert" }) + .ToArray(); + await theStore.BulkInsertAsync(data1); + + var data2 = data1.Take(20) + .Select((t, i) => t with { Number = i < 10 ? 50 : 150, String = "second insert" }) + .ToArray(); + + await theStore.BulkInsertAsync(data2, BulkInsertMode.OverwriteExisting, updateCondition: "(d.data ->> 'Number')::int <= (excluded.data ->> 'Number')::int"); + + await using var session = theStore.QuerySession(); + (await session.Query().CountAsync()).ShouldBe(data1.Length); + + // Values were overwritten + (await session.Query().CountAsync(t => t.Number == 100 && t.String == "initial insert")).ShouldBe(20); + (await session.Query().CountAsync(t => t.Number == 150 && t.String == "second insert")).ShouldBe(10); + } + [Fact] public void load_with_small_batch() { @@ -598,6 +642,8 @@ public async Task can_bulk_insert_mixed_list_of_objects_enlist_transaction_async } } + + public Task InitializeAsync() { return theStore.Advanced.Clean.DeleteAllDocumentsAsync(); diff --git a/src/Marten.Testing/Documents/Target.cs b/src/Marten.Testing/Documents/Target.cs index 69abc7473b..abad50c9a5 100644 --- a/src/Marten.Testing/Documents/Target.cs +++ b/src/Marten.Testing/Documents/Target.cs @@ -17,7 +17,7 @@ public enum Colors Orange } -public class Target +public record Target { private static readonly Random _random = new Random(67); diff --git a/src/Marten/DocumentStore.cs b/src/Marten/DocumentStore.cs index 6c0f1252c4..fd5bc09065 100644 --- a/src/Marten/DocumentStore.cs +++ b/src/Marten/DocumentStore.cs @@ -116,7 +116,7 @@ public void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode int batchSize = 1000, string? updateCondition = null) { var bulkInsertion = new BulkInsertion(Tenancy.Default, Options); - bulkInsertion.BulkInsert(documents, mode, batchSize); + bulkInsertion.BulkInsert(documents, mode, batchSize, updateCondition); } public void BulkInsertEnlistTransaction(IReadOnlyCollection documents, diff --git a/src/Marten/IDocumentStore.cs b/src/Marten/IDocumentStore.cs index bbefff736c..566921521e 100644 --- a/src/Marten/IDocumentStore.cs +++ b/src/Marten/IDocumentStore.cs @@ -45,6 +45,17 @@ public interface IDocumentStore: IDisposable /// /// /// + /// + /// Raw sql that is used as the WHERE clause of the ON CONFLICT DO UPDATE statement when configured for . + /// Use d. to refer to the existing record in the table and excluded. to refer to the conflicting record that was excluded from insertion. + /// + /// + /// Example: + /// + /// "d.export_date <= excluded.export_date" + /// + /// + /// void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, string? updateCondition = null); @@ -57,6 +68,17 @@ void BulkInsert(IReadOnlyCollection documents, BulkInsertMode mode = BulkI /// an existing transaction /// /// + /// + /// Raw sql that is used as the WHERE clause of the ON CONFLICT DO UPDATE statement when configured for . + /// Use d. to refer to the existing record in the table and excluded. to refer to the conflicting record that was excluded from insertion. + /// + /// + /// Example: + /// + /// "d.export_date <= excluded.export_date" + /// + /// + /// void BulkInsertEnlistTransaction(IReadOnlyCollection documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, string? updateCondition = null); @@ -68,6 +90,17 @@ void BulkInsertEnlistTransaction(IReadOnlyCollection documents, Transactio /// /// /// + /// + /// Raw sql that is used as the WHERE clause of the ON CONFLICT DO UPDATE statement when configured for . + /// Use d. to refer to the existing record in the table and excluded. to refer to the conflicting record that was excluded from insertion. + /// + /// + /// Example: + /// + /// "d.export_date <= excluded.export_date" + /// + /// + /// /// void BulkInsert(string tenantId, IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, string? updateCondition = null); @@ -80,6 +113,17 @@ void BulkInsert(string tenantId, IReadOnlyCollection documents, /// /// /// + /// + /// Raw sql that is used as the WHERE clause of the ON CONFLICT DO UPDATE statement when configured for . + /// Use d. to refer to the existing record in the table and excluded. to refer to the conflicting record that was excluded from insertion. + /// + /// + /// Example: + /// + /// "d.export_date <= excluded.export_date" + /// + /// + /// Task BulkInsertAsync(IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, string? updateCondition = null, CancellationToken cancellation = default); @@ -92,6 +136,17 @@ Task BulkInsertAsync(IReadOnlyCollection documents, BulkInsertMode mode = /// an existing transaction /// /// + /// + /// Raw sql that is used as the WHERE clause of the ON CONFLICT DO UPDATE statement when configured for . + /// Use d. to refer to the existing record in the table and excluded. to refer to the conflicting record that was excluded from insertion. + /// + /// + /// Example: + /// + /// "d.export_date <= excluded.export_date" + /// + /// + /// Task BulkInsertEnlistTransactionAsync(IReadOnlyCollection documents, Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, string? updateCondition = null, CancellationToken cancellation = default); @@ -105,6 +160,17 @@ Task BulkInsertEnlistTransactionAsync(IReadOnlyCollection documents, Trans /// /// /// + /// + /// Raw sql that is used as the WHERE clause of the ON CONFLICT DO UPDATE statement when configured for . + /// Use d. to refer to the existing record in the table and excluded. to refer to the conflicting record that was excluded from insertion. + /// + /// + /// Example: + /// + /// "d.export_date <= excluded.export_date" + /// + /// + /// Task BulkInsertAsync(string tenantId, IReadOnlyCollection documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000, string? updateCondition = null, CancellationToken cancellation = default); diff --git a/src/Marten/Storage/BulkInsertion.cs b/src/Marten/Storage/BulkInsertion.cs index 3c81c3439c..aadc2aafa2 100644 --- a/src/Marten/Storage/BulkInsertion.cs +++ b/src/Marten/Storage/BulkInsertion.cs @@ -280,7 +280,7 @@ private void bulkInsertDocuments(IReadOnlyCollection documents, int batchS } else if (mode == BulkInsertMode.OverwriteExisting) { - var upsert = string.Format(loader.UpsertFromTempTable(), updateCondition ?? "1 = 1"); + var upsert = string.Format(loader.UpsertFromTempTable(), updateCondition ?? "true"); conn.CreateCommand(upsert).ExecuteNonQuery(); } @@ -330,7 +330,7 @@ private async Task bulkInsertDocumentsAsync(IReadOnlyCollection documents, } else if (mode == BulkInsertMode.OverwriteExisting) { - var upsert = string.Format(loader.UpsertFromTempTable(), updateCondition ?? "1 = 1"); + var upsert = string.Format(loader.UpsertFromTempTable(), updateCondition ?? "true"); await conn.CreateCommand(upsert).ExecuteNonQueryAsync(cancellation).ConfigureAwait(false); From acf55485010c2da289d37543d47e104de5255f3f Mon Sep 17 00:00:00 2001 From: Ben Edwards Date: Thu, 3 Oct 2024 11:10:16 +1000 Subject: [PATCH 6/8] documentation --- docs/documents/storing.md | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/docs/documents/storing.md b/docs/documents/storing.md index d3b4146303..b3969853fc 100644 --- a/docs/documents/storing.md +++ b/docs/documents/storing.md @@ -12,7 +12,7 @@ a previously persisted document with the same identity. Here's that method in ac with a sample that shows storing both a brand new document and a modified document: - + ```cs using var store = DocumentStore.For("some connection string"); @@ -35,7 +35,7 @@ session.Store(newUser, existingUser); await session.SaveChangesAsync(); ``` -snippet source | anchor +snippet source | anchor The `Store()` method can happily take a mixed bag of document types at one time, but you'll need to tell Marten to use `Store()` instead of letting it infer the document type as shown below: @@ -101,7 +101,7 @@ theStore.BulkInsert(data, batchSize: 500); // And just checking that the data is actually there;) theSession.Query().Count().ShouldBe(data.Length); ``` -snippet source | anchor +snippet source | anchor The bulk insert is done with a single transaction. For really large document collections, you may need to page the calls to `IDocumentStore.BulkInsert()`. @@ -126,13 +126,13 @@ await theStore.BulkInsertAsync(data, batchSize: 500); // And just checking that the data is actually there;) theSession.Query().Count().ShouldBe(data.Length); ``` -snippet source | anchor +snippet source | anchor By default, bulk insert will fail if there are any duplicate id's between the documents being inserted and the existing database data. You can alter this behavior through the `BulkInsertMode` enumeration as shown below: - + ```cs // Just say we have an array of documents we want to bulk insert var data = Target.GenerateRandomData(100).ToArray(); @@ -151,7 +151,25 @@ await store.BulkInsertDocumentsAsync(data, BulkInsertMode.InsertsOnly); // being loaded await store.BulkInsertDocumentsAsync(data, BulkInsertMode.OverwriteExisting); ``` -snippet source | anchor +snippet source | anchor + + +When using `BulkInsertMode.OverwriteExisting` it is also possible to pass in a condition to be evaluated when overwriting documents. +This allows for checks such as "don't update document if the existing document is newer" without having to do more expensive round trips. +Be careful with using this - this is a low level call and the update condition does not support parameters or protection from sql injection. + + + +```cs +// perform a bulk insert of `Target` documents +// but only overwrite existing if the existing document's "Number" +// property is less then the new document's +await theStore.BulkInsertAsync( + data2, + BulkInsertMode.OverwriteExisting, + updateCondition: "(d.data ->> 'Number')::int <= (excluded.data ->> 'Number')::int"); +``` +snippet source | anchor The bulk insert feature can also be used with multi-tenanted documents, but in that @@ -159,7 +177,7 @@ case you are limited to only loading documents to a single tenant at a time as shown below: - + ```cs // Just say we have an array of documents we want to bulk insert var data = Target.GenerateRandomData(100).ToArray(); @@ -173,5 +191,5 @@ using var store = DocumentStore.For(opts => // If multi-tenanted await store.BulkInsertDocumentsAsync("a tenant id", data); ``` -snippet source | anchor +snippet source | anchor From de915ee69227284cd1f5e33e1adb6e24950d3bcf Mon Sep 17 00:00:00 2001 From: Ben Edwards Date: Thu, 3 Oct 2024 11:24:06 +1000 Subject: [PATCH 7/8] fix merge conflicts --- docs/documents/storing.md | 10 +++++----- src/DocumentDbTests/Writing/bulk_loading.cs | 12 +++++++++++- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/docs/documents/storing.md b/docs/documents/storing.md index 968b863383..afdb15a6b0 100644 --- a/docs/documents/storing.md +++ b/docs/documents/storing.md @@ -101,7 +101,7 @@ theStore.BulkInsert(data, batchSize: 500); // And just checking that the data is actually there;) theSession.Query().Count().ShouldBe(data.Length); ``` -snippet source | anchor +snippet source | anchor The bulk insert is done with a single transaction. For really large document collections, you may need to page the calls to `IDocumentStore.BulkInsert()`. @@ -126,7 +126,7 @@ await theStore.BulkInsertAsync(data, batchSize: 500); // And just checking that the data is actually there;) theSession.Query().Count().ShouldBe(data.Length); ``` -snippet source | anchor +snippet source | anchor By default, bulk insert will fail if there are any duplicate id's between the documents being inserted and the existing database data. You can alter this behavior through the `BulkInsertMode` enumeration as shown below: @@ -151,7 +151,7 @@ await store.BulkInsertDocumentsAsync(data, BulkInsertMode.InsertsOnly); // being loaded await store.BulkInsertDocumentsAsync(data, BulkInsertMode.OverwriteExisting); ``` -snippet source | anchor +snippet source | anchor When using `BulkInsertMode.OverwriteExisting` it is also possible to pass in a condition to be evaluated when overwriting documents. @@ -169,7 +169,7 @@ await theStore.BulkInsertAsync( BulkInsertMode.OverwriteExisting, updateCondition: "(d.data ->> 'Number')::int <= (excluded.data ->> 'Number')::int"); ``` -snippet source | anchor +snippet source | anchor The bulk insert feature can also be used with multi-tenanted documents, but in that @@ -191,5 +191,5 @@ using var store = DocumentStore.For(opts => // If multi-tenanted await store.BulkInsertDocumentsAsync("a tenant id", data); ``` -snippet source | anchor +snippet source | anchor diff --git a/src/DocumentDbTests/Writing/bulk_loading.cs b/src/DocumentDbTests/Writing/bulk_loading.cs index 13b40af3e2..a9754a7cab 100644 --- a/src/DocumentDbTests/Writing/bulk_loading.cs +++ b/src/DocumentDbTests/Writing/bulk_loading.cs @@ -98,7 +98,17 @@ public void load_with_overwrite_duplicates_and_update_condition() .Select((t, i) => t with { Number = i < 10 ? 50 : 150, String = "second insert" }) .ToArray(); - theStore.BulkInsert(data2, BulkInsertMode.OverwriteExisting, updateCondition: "(d.data ->> 'Number')::int <= (excluded.data ->> 'Number')::int"); + #region sample_BulkInsertWithUpdateCondition + + // perform a bulk insert of `Target` documents + // but only overwrite existing if the existing document's "Number" + // property is less then the new document's + await theStore.BulkInsertAsync( + data2, + BulkInsertMode.OverwriteExisting, + updateCondition: "(d.data ->> 'Number')::int <= (excluded.data ->> 'Number')::int"); + + #endregion using var session = theStore.QuerySession(); session.Query().Count().ShouldBe(data1.Length); From 7ca7203e82979fe9860c0d0ec97f4c80b09c3ce0 Mon Sep 17 00:00:00 2001 From: Ben Edwards Date: Thu, 3 Oct 2024 11:25:41 +1000 Subject: [PATCH 8/8] round out the tests --- src/DocumentDbTests/Writing/bulk_loading.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/DocumentDbTests/Writing/bulk_loading.cs b/src/DocumentDbTests/Writing/bulk_loading.cs index a9754a7cab..a94b2dd1af 100644 --- a/src/DocumentDbTests/Writing/bulk_loading.cs +++ b/src/DocumentDbTests/Writing/bulk_loading.cs @@ -116,6 +116,7 @@ await theStore.BulkInsertAsync( // Values were overwritten session.Query().Count(t => t.Number == 100 && t.String == "initial insert").ShouldBe(20); session.Query().Count(t => t.Number == 150 && t.String == "second insert").ShouldBe(10); + session.Query().Count(t => t.Number == 50).ShouldBe(0); } [Fact] @@ -138,6 +139,7 @@ public async Task load_with_overwrite_duplicates_and_update_condition_async() // Values were overwritten (await session.Query().CountAsync(t => t.Number == 100 && t.String == "initial insert")).ShouldBe(20); (await session.Query().CountAsync(t => t.Number == 150 && t.String == "second insert")).ShouldBe(10); + (await session.Query().Count(t => t.Number == 50)).ShouldBe(0); } [Fact]