Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use insert .. on conflict for bulk import operations #3301

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions docs/documents/storing.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ a previously persisted document with the same identity. Here's that method in ac
with a sample that shows storing both a brand new document and a modified document:

<!-- snippet: sample_using_DocumentSession_Store -->
<a id='snippet-sample_using_documentsession_store'></a>
<a id='snippet-sample_using_DocumentSession_Store'></a>
```cs
using var store = DocumentStore.For("some connection string");

Expand All @@ -35,7 +35,7 @@ session.Store(newUser, existingUser);

await session.SaveChangesAsync();
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/StoringDocuments.cs#L37-L60' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_documentsession_store' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/StoringDocuments.cs#L37-L60' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_DocumentSession_Store' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The `Store()` method can happily take a mixed bag of document types at one time, but you'll need to tell Marten to use `Store<object>()` instead of letting it infer the document type as shown below:
Expand Down Expand Up @@ -101,7 +101,7 @@ theStore.BulkInsert(data, batchSize: 500);
// And just checking that the data is actually there;)
theSession.Query<Target>().Count().ShouldBe(data.Length);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DocumentDbTests/Writing/bulk_loading.cs#L92-L102' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_bulk_insert' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DocumentDbTests/Writing/bulk_loading.cs#L146-L156' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_bulk_insert' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The bulk insert is done with a single transaction. For really large document collections, you may need to page the calls to `IDocumentStore.BulkInsert()`.
Expand All @@ -126,13 +126,13 @@ await theStore.BulkInsertAsync(data, batchSize: 500);
// And just checking that the data is actually there;)
theSession.Query<Target>().Count().ShouldBe(data.Length);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DocumentDbTests/Writing/bulk_loading.cs#L250-L260' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_bulk_insert_async' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DocumentDbTests/Writing/bulk_loading.cs#L304-L314' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_bulk_insert_async' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

By default, bulk insert will fail if there are any duplicate id's between the documents being inserted and the existing database data. You can alter this behavior through the `BulkInsertMode` enumeration as shown below:

<!-- snippet: sample_BulkInsertMode_usages -->
<a id='snippet-sample_bulkinsertmode_usages'></a>
<a id='snippet-sample_BulkInsertMode_usages'></a>
```cs
// Just say we have an array of documents we want to bulk insert
var data = Target.GenerateRandomData(100).ToArray();
Expand All @@ -151,15 +151,33 @@ await store.BulkInsertDocumentsAsync(data, BulkInsertMode.InsertsOnly);
// being loaded
await store.BulkInsertDocumentsAsync(data, BulkInsertMode.OverwriteExisting);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DocumentDbTests/Writing/bulk_loading.cs#L329-L348' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_bulkinsertmode_usages' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DocumentDbTests/Writing/bulk_loading.cs#L383-L402' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_BulkInsertMode_usages' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

When using `BulkInsertMode.OverwriteExisting` it is also possible to pass in a condition to be evaluated when overwriting documents.
This allows for checks such as "don't update document if the existing document is newer" without having to do more expensive round trips.
Be careful with using this - this is a low level call and the update condition does not support parameters or protection from sql injection.

<!-- snippet: sample_BulkInsertWithUpdateCondition -->
<a id='snippet-sample_BulkInsertWithUpdateCondition'></a>
```cs
// perform a bulk insert of `Target` documents
// but only overwrite existing if the existing document's "Number"
// property is less then the new document's
await theStore.BulkInsertAsync(
data2,
BulkInsertMode.OverwriteExisting,
updateCondition: "(d.data ->> 'Number')::int <= (excluded.data ->> 'Number')::int");
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DocumentDbTests/Writing/bulk_loading.cs#L101-L111' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_BulkInsertWithUpdateCondition' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The bulk insert feature can also be used with multi-tenanted documents, but in that
case you are limited to only loading documents to a single tenant at a time as
shown below:

<!-- snippet: sample_MultiTenancyWithBulkInsert -->
<a id='snippet-sample_multitenancywithbulkinsert'></a>
<a id='snippet-sample_MultiTenancyWithBulkInsert'></a>
```cs
// Just say we have an array of documents we want to bulk insert
var data = Target.GenerateRandomData(100).ToArray();
Expand All @@ -173,5 +191,5 @@ using var store = DocumentStore.For(opts =>
// If multi-tenanted
await store.BulkInsertDocumentsAsync("a tenant id", data);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DocumentDbTests/Writing/bulk_loading.cs#L353-L367' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_multitenancywithbulkinsert' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DocumentDbTests/Writing/bulk_loading.cs#L407-L421' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_MultiTenancyWithBulkInsert' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
62 changes: 59 additions & 3 deletions src/DocumentDbTests/Writing/bulk_loading.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,62 @@ public void load_with_overwrite_duplicates()
count.ShouldBe(0);
}

[Fact]
public void load_with_overwrite_duplicates_and_update_condition()
{
var data1 = Target.GenerateRandomData(30)
.Select(t => t with { Number = 100, String = "initial insert" })
.ToArray();
theStore.BulkInsert(data1);

var data2 = data1.Take(20)
.Select((t, i) => t with { Number = i < 10 ? 50 : 150, String = "second insert" })
.ToArray();

#region sample_BulkInsertWithUpdateCondition

// perform a bulk insert of `Target` documents
// but only overwrite existing if the existing document's "Number"
// property is less then the new document's
await theStore.BulkInsertAsync(
data2,
BulkInsertMode.OverwriteExisting,
updateCondition: "(d.data ->> 'Number')::int <= (excluded.data ->> 'Number')::int");

#endregion

using var session = theStore.QuerySession();
session.Query<Target>().Count().ShouldBe(data1.Length);

// Values were overwritten
session.Query<Target>().Count(t => t.Number == 100 && t.String == "initial insert").ShouldBe(20);
session.Query<Target>().Count(t => t.Number == 150 && t.String == "second insert").ShouldBe(10);
session.Query<Target>().Count(t => t.Number == 50).ShouldBe(0);
}

[Fact]
public async Task load_with_overwrite_duplicates_and_update_condition_async()
{
var data1 = Target.GenerateRandomData(30)
.Select(t => t with { Number = 100, String = "initial insert" })
.ToArray();
await theStore.BulkInsertAsync(data1);

var data2 = data1.Take(20)
.Select((t, i) => t with { Number = i < 10 ? 50 : 150, String = "second insert" })
.ToArray();

await theStore.BulkInsertAsync(data2, BulkInsertMode.OverwriteExisting, updateCondition: "(d.data ->> 'Number')::int <= (excluded.data ->> 'Number')::int");

await using var session = theStore.QuerySession();
(await session.Query<Target>().CountAsync()).ShouldBe(data1.Length);

// Values were overwritten
(await session.Query<Target>().CountAsync(t => t.Number == 100 && t.String == "initial insert")).ShouldBe(20);
(await session.Query<Target>().CountAsync(t => t.Number == 150 && t.String == "second insert")).ShouldBe(10);
(await session.Query<Target>().Count(t => t.Number == 50)).ShouldBe(0);
}

[Fact]
public void load_with_small_batch()
{
Expand Down Expand Up @@ -265,7 +321,7 @@ public async Task load_with_small_batch_async()
[Fact]
public async Task load_across_multiple_tenants_async()
{
StoreOptions(opts =>
StoreOptions(opts =>
{
opts.Policies.AllDocumentsAreMultiTenanted();
});
Expand Down Expand Up @@ -497,7 +553,7 @@ public void load_enlist_transaction()
{
theStore.BulkInsertEnlistTransaction(data, Transaction.Current);
scope.Complete();
}
}

using var session = theStore.QuerySession();
session.Query<Target>().Count().ShouldBe(data.Length);
Expand All @@ -510,7 +566,7 @@ public void load_enlist_transaction_no_commit()

using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
theStore.BulkInsertEnlistTransaction(data, Transaction.Current);
theStore.BulkInsertEnlistTransaction(data, Transaction.Current);
}

using var session = theStore.QuerySession();
Expand Down
2 changes: 1 addition & 1 deletion src/Marten.Testing/Documents/Target.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public enum Colors
Orange
}

public class Target
public record Target
{
private static readonly Random _random = new Random(67);

Expand Down
37 changes: 22 additions & 15 deletions src/Marten/DocumentStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
using Marten.Internal.Sessions;
using Marten.Services;
using Marten.Storage;
using Microsoft.CodeAnalysis.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Weasel.Core.Migrations;
using Weasel.Postgresql.Connections;
using IsolationLevel = System.Data.IsolationLevel;

Expand Down Expand Up @@ -114,18 +112,18 @@ public ValueTask DisposeAsync()
public AdvancedOperations Advanced { get; }

public void BulkInsert<T>(IReadOnlyCollection<T> documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000)
int batchSize = 1000, string? updateCondition = null)
{
var bulkInsertion = new BulkInsertion(Tenancy.Default, Options);
bulkInsertion.BulkInsert(documents, mode, batchSize);
bulkInsertion.BulkInsert(documents, mode, batchSize, updateCondition);
}

public void BulkInsertEnlistTransaction<T>(IReadOnlyCollection<T> documents,
Transaction transaction, BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000)
int batchSize = 1000, string? updateCondition = null)
{
var bulkInsertion = new BulkInsertion(Tenancy.Default, Options);
bulkInsertion.BulkInsertEnlistTransaction(documents, transaction, mode, batchSize);
bulkInsertion.BulkInsertEnlistTransaction(documents, transaction, mode, batchSize, updateCondition);
}

public void BulkInsertDocuments(IEnumerable<object> documents, BulkInsertMode mode = BulkInsertMode.InsertsOnly,
Expand All @@ -137,10 +135,11 @@ public void BulkInsertDocuments(IEnumerable<object> documents, BulkInsertMode mo

public void BulkInsert<T>(string tenantId, IReadOnlyCollection<T> documents,
BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000)
int batchSize = 1000,
string? updateCondition = null)
{
var bulkInsertion = new BulkInsertion(Tenancy.GetTenant(Options.MaybeCorrectTenantId(tenantId)), Options);
bulkInsertion.BulkInsert(documents, mode, batchSize);
bulkInsertion.BulkInsert(documents, mode, batchSize, updateCondition);
}

public void BulkInsertDocuments(string tenantId, IEnumerable<object> documents,
Expand All @@ -153,27 +152,35 @@ public void BulkInsertDocuments(string tenantId, IEnumerable<object> documents,

public Task BulkInsertAsync<T>(IReadOnlyCollection<T> documents,
BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000, CancellationToken cancellation = default)
int batchSize = 1000,
string? updateCondition = null,
CancellationToken cancellation = default)
{
var bulkInsertion = new BulkInsertion(Tenancy.Default, Options);
return bulkInsertion.BulkInsertAsync(documents, mode, batchSize, cancellation);
return bulkInsertion.BulkInsertAsync(documents, mode, batchSize, updateCondition, cancellation);
}

public Task BulkInsertEnlistTransactionAsync<T>(IReadOnlyCollection<T> documents,
Transaction transaction,
BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000, CancellationToken cancellation = default)
int batchSize = 1000,
string? updateCondition = null,
CancellationToken cancellation = default)
{
var bulkInsertion = new BulkInsertion(Tenancy.Default, Options);
return bulkInsertion.BulkInsertEnlistTransactionAsync(documents, transaction, mode, batchSize, cancellation);
return bulkInsertion.BulkInsertEnlistTransactionAsync(documents, transaction, mode, batchSize, updateCondition, cancellation);
}

public async Task BulkInsertAsync<T>(string tenantId, IReadOnlyCollection<T> documents,
BulkInsertMode mode = BulkInsertMode.InsertsOnly, int batchSize = 1000,
public async Task BulkInsertAsync<T>(
string tenantId,
IReadOnlyCollection<T> documents,
BulkInsertMode mode = BulkInsertMode.InsertsOnly,
int batchSize = 1000,
string? updateCondition = null,
CancellationToken cancellation = default)
{
var bulkInsertion = new BulkInsertion(await Tenancy.GetTenantAsync(Options.MaybeCorrectTenantId(tenantId)).ConfigureAwait(false), Options);
await bulkInsertion.BulkInsertAsync(documents, mode, batchSize, cancellation).ConfigureAwait(false);
await bulkInsertion.BulkInsertAsync(documents, mode, batchSize, updateCondition, cancellation).ConfigureAwait(false);
}

public Task BulkInsertDocumentsAsync(IEnumerable<object> documents,
Expand Down
Loading
Loading