Skip to content

Commit

Permalink
Fix not firing data changed event on replace item (#386)
Browse files Browse the repository at this point in the history
* fire event in Replace Item

* move assertion into Upsert

* add tests to confirm upserting works as intended

* fill out data tests

* Add Data cange mode for container data

* switch overloaded methods to internal, rename data change mode values to manual and auto
  • Loading branch information
r-dunning authored Nov 15, 2024
1 parent a7c1b19 commit c553c50
Show file tree
Hide file tree
Showing 8 changed files with 544 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ public async Task Valid_MigratesDataCorrectly()

var client = customerApi.CreateClient(authHeader);
var request = new { EmailAddress = "bobby@bobertson.co.uk" };
await client.PatchAsJsonAsync("/customers/CustomerId", request);
var response = await client.PatchAsJsonAsync("/customers/CustomerId", request);

response.StatusCode.Should().Be(HttpStatusCode.OK);
await customerApi.Then.TheCustomerShouldMatch(customerUri, c => c.EmailAddresses.SequenceEqual(new List<String> { "bobby@bobertson.co.uk" }));
}

Expand Down
7 changes: 7 additions & 0 deletions sample/CustomerApi.Tests/TestApi/CustomerStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ Discretionary<string> lastName
return await customerEventRepository.ApplyEvents(customerUri.Uri, 0, customerCreated);
}

public async Task<CustomerReadModel> AnExistingCustomerEmailWasUpdated(CustomerUri customerUri, string emailAddress)
{
var customerReadModel = await customerEventRepository.Get(customerUri.Uri);
var emailUpdated = new CustomerEmailAddressChanged(customerUri, customerReadModel!.EmailAddress, emailAddress);
return await customerEventRepository.ApplyEvents(customerUri.Uri, customerReadModel.Revision, emailUpdated);
}

public async Task GivenTheCustomerIsDeleted(CustomerUri customerUri)
{
var customerDeleted = new CustomerDeleted(customerUri);
Expand Down
126 changes: 113 additions & 13 deletions src/LogOtter.CosmosDb.ContainerMock/ContainerMock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,23 @@ public IEnumerable<TestContainerItem<T>> GetAllItems<T>()
}
}

public override async Task<ResponseMessage> CreateItemStreamAsync(
public override Task<ResponseMessage> CreateItemStreamAsync(
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions? requestOptions = null,
CancellationToken cancellationToken = default
)
{
return CreateItemStreamAsync(streamPayload, partitionKey, requestOptions, DataChangeMode.Auto, cancellationToken);
}

internal async Task<ResponseMessage> CreateItemStreamAsync(
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions? requestOptions = null,
DataChangeMode dataChangeMode = DataChangeMode.Auto,
CancellationToken cancellationToken = default
)
{
ThrowNextExceptionIfPresent(new InvocationInformation(nameof(CreateItemStreamAsync)));

Expand All @@ -81,7 +92,7 @@ public override async Task<ResponseMessage> CreateItemStreamAsync(

try
{
var response = await _containerData.AddItem(json, partitionKey, requestOptions, cancellationToken);
var response = await _containerData.AddItem(json, partitionKey, dataChangeMode, requestOptions, cancellationToken);

return ToCosmosResponseMessage(response, streamPayload);
}
Expand All @@ -91,10 +102,21 @@ public override async Task<ResponseMessage> CreateItemStreamAsync(
}
}

public override async Task<ItemResponse<T>> CreateItemAsync<T>(
public override Task<ItemResponse<T>> CreateItemAsync<T>(
T item,
PartitionKey? partitionKey = default,
ItemRequestOptions? requestOptions = null,
CancellationToken cancellationToken = default
)
{
return CreateItemAsync(item, partitionKey, requestOptions, DataChangeMode.Auto, cancellationToken);
}

internal async Task<ItemResponse<T>> CreateItemAsync<T>(
T item,
PartitionKey? partitionKey = default,
ItemRequestOptions? requestOptions = null,
DataChangeMode dataChangeMode = DataChangeMode.Auto,
CancellationToken cancellationToken = default
)
{
Expand Down Expand Up @@ -126,7 +148,7 @@ public override async Task<ItemResponse<T>> CreateItemAsync<T>(

try
{
var response = await _containerData.AddItem(json, GetPartitionKey(json, partitionKey), requestOptions, cancellationToken);
var response = await _containerData.AddItem(json, GetPartitionKey(json, partitionKey), dataChangeMode, requestOptions, cancellationToken);

return ToCosmosItemResponse<T>(response);
}
Expand Down Expand Up @@ -201,12 +223,23 @@ public override Task<ItemResponse<T>> ReadItemAsync<T>(
return Task.FromResult<ItemResponse<T>>(itemResponse);
}

public override async Task<ResponseMessage> UpsertItemStreamAsync(
public override Task<ResponseMessage> UpsertItemStreamAsync(
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions? requestOptions = null,
CancellationToken cancellationToken = default
)
{
return UpsertItemStreamAsync(streamPayload, partitionKey, requestOptions, DataChangeMode.Auto, cancellationToken);
}

internal async Task<ResponseMessage> UpsertItemStreamAsync(
Stream streamPayload,
PartitionKey partitionKey,
ItemRequestOptions? requestOptions = null,
DataChangeMode dataChangeMode = DataChangeMode.Auto,
CancellationToken cancellationToken = default
)
{
ThrowNextExceptionIfPresent(new InvocationInformation(nameof(UpsertItemStreamAsync)));

Expand All @@ -216,7 +249,7 @@ public override async Task<ResponseMessage> UpsertItemStreamAsync(

try
{
var response = await _containerData.UpsertItem(json, partitionKey, requestOptions, cancellationToken);
var response = await _containerData.UpsertItem(json, partitionKey, dataChangeMode, requestOptions, cancellationToken);
return ToCosmosResponseMessage(response, streamPayload);
}
catch (ContainerMockException ex)
Expand All @@ -225,10 +258,21 @@ public override async Task<ResponseMessage> UpsertItemStreamAsync(
}
}

public override async Task<ItemResponse<T>> UpsertItemAsync<T>(
public override Task<ItemResponse<T>> UpsertItemAsync<T>(
T item,
PartitionKey? partitionKey = default,
ItemRequestOptions? requestOptions = null,
CancellationToken cancellationToken = default
)
{
return UpsertItemAsync(item, partitionKey, requestOptions, DataChangeMode.Auto, cancellationToken);
}

internal async Task<ItemResponse<T>> UpsertItemAsync<T>(
T item,
PartitionKey? partitionKey = default,
ItemRequestOptions? requestOptions = null,
DataChangeMode dataChangeMode = DataChangeMode.Auto,
CancellationToken cancellationToken = default
)
{
Expand All @@ -238,7 +282,13 @@ public override async Task<ItemResponse<T>> UpsertItemAsync<T>(

try
{
var response = await _containerData.UpsertItem(json, GetPartitionKey(json, partitionKey), requestOptions, cancellationToken);
var response = await _containerData.UpsertItem(
json,
GetPartitionKey(json, partitionKey),
dataChangeMode,
requestOptions,
cancellationToken
);
return ToCosmosItemResponse<T>(response);
}
catch (ContainerMockException ex)
Expand All @@ -247,11 +297,23 @@ public override async Task<ItemResponse<T>> UpsertItemAsync<T>(
}
}

public override async Task<ItemResponse<T>> ReplaceItemAsync<T>(
public override Task<ItemResponse<T>> ReplaceItemAsync<T>(
T item,
string id,
PartitionKey? partitionKey = default,
ItemRequestOptions? requestOptions = null,
CancellationToken cancellationToken = default
)
{
return ReplaceItemAsync(item, id, partitionKey, requestOptions, DataChangeMode.Auto, cancellationToken);
}

internal async Task<ItemResponse<T>> ReplaceItemAsync<T>(
T item,
string id,
PartitionKey? partitionKey = default,
ItemRequestOptions? requestOptions = null,
DataChangeMode dataChangeMode = DataChangeMode.Auto,
CancellationToken cancellationToken = default
)
{
Expand All @@ -261,7 +323,14 @@ public override async Task<ItemResponse<T>> ReplaceItemAsync<T>(

try
{
var response = await _containerData.ReplaceItem(id, json, GetPartitionKey(json, partitionKey), requestOptions, cancellationToken);
var response = await _containerData.ReplaceItem(
id,
json,
GetPartitionKey(json, partitionKey),
dataChangeMode,
requestOptions,
cancellationToken
);
return ToCosmosItemResponse<T>(response);
}
catch (ContainerMockException ex)
Expand Down Expand Up @@ -295,12 +364,23 @@ public override Task<ItemResponse<T>> DeleteItemAsync<T>(
ItemRequestOptions? requestOptions = null,
CancellationToken cancellationToken = default
)
{
return DeleteItemAsync<T>(id, partitionKey, requestOptions, DataChangeMode.Auto, cancellationToken);
}

internal Task<ItemResponse<T>> DeleteItemAsync<T>(
string id,
PartitionKey partitionKey,
ItemRequestOptions? requestOptions = null,
DataChangeMode dataChangeMode = DataChangeMode.Auto,
CancellationToken cancellationToken = default
)
{
ThrowNextExceptionIfPresent(new InvocationInformation(nameof(DeleteItemAsync)));

try
{
_containerData.RemoveItem(id, partitionKey, requestOptions);
_containerData.RemoveItem(id, partitionKey, dataChangeMode, requestOptions);

var itemResponse = new MockItemResponse<T>(HttpStatusCode.NoContent);
return Task.FromResult<ItemResponse<T>>(itemResponse);
Expand All @@ -311,18 +391,29 @@ public override Task<ItemResponse<T>> DeleteItemAsync<T>(
}
}

public override async Task<ResponseMessage> DeleteItemStreamAsync(
public override Task<ResponseMessage> DeleteItemStreamAsync(
string id,
PartitionKey partitionKey,
ItemRequestOptions? requestOptions = null,
CancellationToken cancellationToken = default
)
{
return DeleteItemStreamAsync(id, partitionKey, requestOptions, DataChangeMode.Auto, cancellationToken);
}

internal async Task<ResponseMessage> DeleteItemStreamAsync(
string id,
PartitionKey partitionKey,
ItemRequestOptions? requestOptions = null,
DataChangeMode dataChangeMode = DataChangeMode.Auto,
CancellationToken cancellationToken = default
)
{
ThrowNextExceptionIfPresent(new InvocationInformation(nameof(DeleteItemStreamAsync)));

try
{
_containerData.RemoveItem(id, partitionKey, requestOptions);
_containerData.RemoveItem(id, partitionKey, dataChangeMode, requestOptions);
var responseMessage = new ResponseMessage(HttpStatusCode.NoContent);
return responseMessage;
}
Expand Down Expand Up @@ -396,6 +487,15 @@ public void RestoreSnapshot(ContainerDataSnapshot snapshot)
_containerData.RestoreSnapshot(snapshot);
}

public void ExecuteDataChanges()
{
while (!_containerData.DataChanges.IsEmpty)
{
_containerData.DataChanges.TryDequeue(out var action);
action?.Invoke();
}
}

private static ResponseMessage ToCosmosResponseMessage(Response response, Stream streamPayload)
{
var statusCode = response.IsUpdate ? HttpStatusCode.OK : HttpStatusCode.Created;
Expand Down
Loading

0 comments on commit c553c50

Please sign in to comment.