From eddc04d7395d8057b18d8960bed1fdaed2adec51 Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 30 Aug 2022 13:46:29 +0200 Subject: [PATCH 01/11] Setup prerelease for streaming aggregate events --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5e4a3de..665a9fe 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,7 +1,7 @@ name: Contracts CI/CD env: - PRERELEASE_BRANCHES: mithrandir + PRERELEASE_BRANCHES: treebeard on: push: From e1ae4b53a0df0f4bb834ed786fc0d7cca379ee4e Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 30 Aug 2022 13:51:53 +0200 Subject: [PATCH 02/11] Implement a streaming rpc method for getting aggregate events in batches --- Source/Runtime/Events/EventStore.proto | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/Source/Runtime/Events/EventStore.proto b/Source/Runtime/Events/EventStore.proto index 79cda6d..44082de 100644 --- a/Source/Runtime/Events/EventStore.proto +++ b/Source/Runtime/Events/EventStore.proto @@ -3,6 +3,7 @@ syntax = "proto3"; +import "Artifacts/Artifact.proto"; import "Protobuf/Failure.proto"; import "Services/CallContext.proto"; import "Runtime/Events/Aggregate.proto"; @@ -28,6 +29,11 @@ message FetchForAggregateRequest { services.CallRequestContext callContext = 1; Aggregate aggregate = 2; } +message FetchForAggregateInBatchesRequest { + services.CallRequestContext callContext = 1; + Aggregate aggregate = 2; + repeated artifacts.Artifact eventTypes = 3; +} message CommitEventsResponse { protobuf.Failure failure = 1; // not set if not failed @@ -48,5 +54,6 @@ message FetchForAggregateResponse { service EventStore { rpc Commit(CommitEventsRequest) returns(CommitEventsResponse); rpc CommitForAggregate(CommitAggregateEventsRequest) returns(CommitAggregateEventsResponse); - rpc FetchForAggregate(FetchForAggregateRequest) returns(FetchForAggregateResponse); + rpc FetchForAggregate(FetchForAggregateRequest) returns(FetchForAggregateResponse); // DEPRECATED: This is superseeded by FetchForAggregateInBatches + rpc FetchForAggregateInBatches(FetchForAggregateInBatchesRequest) returns (stream FetchForAggregateResponse); } From d1cd044a811e725394e1775ef0ec6d9967895f95 Mon Sep 17 00:00:00 2001 From: woksin Date: Thu, 1 Sep 2022 12:36:52 +0200 Subject: [PATCH 03/11] Method for getting the aggreagte root version --- Source/Runtime/Aggregates/AggregateRoots.proto | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/Source/Runtime/Aggregates/AggregateRoots.proto b/Source/Runtime/Aggregates/AggregateRoots.proto index 1ed797f..fff6967 100644 --- a/Source/Runtime/Aggregates/AggregateRoots.proto +++ b/Source/Runtime/Aggregates/AggregateRoots.proto @@ -22,6 +22,18 @@ message AggregateRootAliasRegistrationResponse { protobuf.Failure failure = 1; } +message AggregateRootVersionRequest { + services.CallRequestContext callContext = 1; + artifacts.Artifact aggregateRoot = 2; + string eventSourceId = 3; +} + +message AggregateRootAliasRegistrationResponse { + protobuf.Failure failure = 1; + uint64 aggregateRootVersion = 2; +} + service AggregateRoots { rpc RegisterAlias(AggregateRootAliasRegistrationRequest) returns(AggregateRootAliasRegistrationResponse); + rpc GetVersion() returns () } From bf8bcd14b669a14f3d18dbb82e5d71f7d656f048 Mon Sep 17 00:00:00 2001 From: woksin Date: Thu, 1 Sep 2022 12:40:05 +0200 Subject: [PATCH 04/11] Make it build --- Source/Runtime/Aggregates/AggregateRoots.proto | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Source/Runtime/Aggregates/AggregateRoots.proto b/Source/Runtime/Aggregates/AggregateRoots.proto index fff6967..3eb3d8d 100644 --- a/Source/Runtime/Aggregates/AggregateRoots.proto +++ b/Source/Runtime/Aggregates/AggregateRoots.proto @@ -28,12 +28,12 @@ message AggregateRootVersionRequest { string eventSourceId = 3; } -message AggregateRootAliasRegistrationResponse { +message AggregateRootVersionResponse { protobuf.Failure failure = 1; uint64 aggregateRootVersion = 2; } service AggregateRoots { rpc RegisterAlias(AggregateRootAliasRegistrationRequest) returns(AggregateRootAliasRegistrationResponse); - rpc GetVersion() returns () + rpc GetVersion(AggregateRootVersionRequest) returns(AggregateRootVersionResponse) } From a9fd1a4802de389863b34ce750b1637dd666be01 Mon Sep 17 00:00:00 2001 From: woksin Date: Thu, 1 Sep 2022 12:42:49 +0200 Subject: [PATCH 05/11] Missing semicolon --- Source/Runtime/Aggregates/AggregateRoots.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/Runtime/Aggregates/AggregateRoots.proto b/Source/Runtime/Aggregates/AggregateRoots.proto index 3eb3d8d..ef0d7ac 100644 --- a/Source/Runtime/Aggregates/AggregateRoots.proto +++ b/Source/Runtime/Aggregates/AggregateRoots.proto @@ -35,5 +35,5 @@ message AggregateRootVersionResponse { service AggregateRoots { rpc RegisterAlias(AggregateRootAliasRegistrationRequest) returns(AggregateRootAliasRegistrationResponse); - rpc GetVersion(AggregateRootVersionRequest) returns(AggregateRootVersionResponse) + rpc GetVersion(AggregateRootVersionRequest) returns(AggregateRootVersionResponse); } From ab9639646bc4a5276776b091826abbd17216b851 Mon Sep 17 00:00:00 2001 From: woksin Date: Fri, 2 Sep 2022 09:14:36 +0200 Subject: [PATCH 06/11] Deprecate field --- Source/Runtime/Events/Committed.proto | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Source/Runtime/Events/Committed.proto b/Source/Runtime/Events/Committed.proto index 62d871e..b727ebf 100644 --- a/Source/Runtime/Events/Committed.proto +++ b/Source/Runtime/Events/Committed.proto @@ -37,6 +37,7 @@ message CommittedAggregateEvents { } string eventSourceId = 1; protobuf.Uuid aggregateRootId = 2; - uint64 aggregateRootVersion = 3; + uint64 aggregateRootVersion = 3; [deprecated = true] // Replaced by currentAggregateRootVersion repeated CommittedAggregateEvent events = 4; + uint64 currentAggregateRootVersion = 5; // Represents the current version of the aggregate root } From 4b6c9beb5306bd4532bcc2fbdb0c77bb7bf33df5 Mon Sep 17 00:00:00 2001 From: woksin Date: Fri, 2 Sep 2022 09:18:49 +0200 Subject: [PATCH 07/11] Make it build --- Source/Runtime/Events/Committed.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Source/Runtime/Events/Committed.proto b/Source/Runtime/Events/Committed.proto index b727ebf..79fc9c3 100644 --- a/Source/Runtime/Events/Committed.proto +++ b/Source/Runtime/Events/Committed.proto @@ -37,7 +37,7 @@ message CommittedAggregateEvents { } string eventSourceId = 1; protobuf.Uuid aggregateRootId = 2; - uint64 aggregateRootVersion = 3; [deprecated = true] // Replaced by currentAggregateRootVersion + uint64 aggregateRootVersion = 3 [deprecated = true]; // DEPRECATED Replaced by currentAggregateRootVersion repeated CommittedAggregateEvent events = 4; uint64 currentAggregateRootVersion = 5; // Represents the current version of the aggregate root } From f86c912d59dee96e66805dbda7cd58a1c9a60cfc Mon Sep 17 00:00:00 2001 From: woksin Date: Fri, 2 Sep 2022 09:36:07 +0200 Subject: [PATCH 08/11] Don't need method for getting aggregate root version --- Source/Runtime/Aggregates/AggregateRoots.proto | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/Source/Runtime/Aggregates/AggregateRoots.proto b/Source/Runtime/Aggregates/AggregateRoots.proto index ef0d7ac..1ed797f 100644 --- a/Source/Runtime/Aggregates/AggregateRoots.proto +++ b/Source/Runtime/Aggregates/AggregateRoots.proto @@ -22,18 +22,6 @@ message AggregateRootAliasRegistrationResponse { protobuf.Failure failure = 1; } -message AggregateRootVersionRequest { - services.CallRequestContext callContext = 1; - artifacts.Artifact aggregateRoot = 2; - string eventSourceId = 3; -} - -message AggregateRootVersionResponse { - protobuf.Failure failure = 1; - uint64 aggregateRootVersion = 2; -} - service AggregateRoots { rpc RegisterAlias(AggregateRootAliasRegistrationRequest) returns(AggregateRootAliasRegistrationResponse); - rpc GetVersion(AggregateRootVersionRequest) returns(AggregateRootVersionResponse); } From 282025ba8efce7cdfc0835ff9e53e49378e5e4a7 Mon Sep 17 00:00:00 2001 From: woksin Date: Fri, 2 Sep 2022 13:37:25 +0200 Subject: [PATCH 09/11] Adds aggregateRootVersion field to CommittedAggregateEvent --- Source/Runtime/Events/Committed.proto | 1 + 1 file changed, 1 insertion(+) diff --git a/Source/Runtime/Events/Committed.proto b/Source/Runtime/Events/Committed.proto index 79fc9c3..e0a8dfe 100644 --- a/Source/Runtime/Events/Committed.proto +++ b/Source/Runtime/Events/Committed.proto @@ -34,6 +34,7 @@ message CommittedAggregateEvents { artifacts.Artifact eventType = 4; bool public = 5; string content = 6; + uint64 aggregateRootVersion = 7; // The aggregate root version the event was applied to } string eventSourceId = 1; protobuf.Uuid aggregateRootId = 2; From 066b7ecee647f52043dfb79ea79c78da17ed6abc Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 6 Sep 2022 16:09:54 +0200 Subject: [PATCH 10/11] Split request --- Source/Runtime/Events/EventStore.proto | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Source/Runtime/Events/EventStore.proto b/Source/Runtime/Events/EventStore.proto index 44082de..51d2ffa 100644 --- a/Source/Runtime/Events/EventStore.proto +++ b/Source/Runtime/Events/EventStore.proto @@ -32,6 +32,16 @@ message FetchForAggregateRequest { message FetchForAggregateInBatchesRequest { services.CallRequestContext callContext = 1; Aggregate aggregate = 2; + oneof Request{ + FetchAllEventsForAggregateInBatchesRequest fetchAllEvents = 3; + FetchEventsForAggregateInBatchesRequest fetchEvents = 4; + + } +} +message FetchAllEventsForAggregateInBatchesRequest { +} + +message FetchEventsForAggregateInBatchesRequest {@ repeated artifacts.Artifact eventTypes = 3; } From a0cd36f96a18be8e63425c7ae8506355621fb2f2 Mon Sep 17 00:00:00 2001 From: woksin Date: Tue, 6 Sep 2022 19:31:18 +0200 Subject: [PATCH 11/11] Make it build --- Source/Runtime/Events/EventStore.proto | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/Source/Runtime/Events/EventStore.proto b/Source/Runtime/Events/EventStore.proto index 51d2ffa..b438bb2 100644 --- a/Source/Runtime/Events/EventStore.proto +++ b/Source/Runtime/Events/EventStore.proto @@ -32,17 +32,16 @@ message FetchForAggregateRequest { message FetchForAggregateInBatchesRequest { services.CallRequestContext callContext = 1; Aggregate aggregate = 2; - oneof Request{ + oneof Request { FetchAllEventsForAggregateInBatchesRequest fetchAllEvents = 3; FetchEventsForAggregateInBatchesRequest fetchEvents = 4; - } } message FetchAllEventsForAggregateInBatchesRequest { } -message FetchEventsForAggregateInBatchesRequest {@ - repeated artifacts.Artifact eventTypes = 3; +message FetchEventsForAggregateInBatchesRequest { + repeated artifacts.Artifact eventTypes = 1; } message CommitEventsResponse {