diff --git a/db/migrations/postgres/000092_add_pin_namespace.down.sql b/db/migrations/postgres/000092_add_pin_namespace.down.sql index 44c4c79c83..67c6e12968 100644 --- a/db/migrations/postgres/000092_add_pin_namespace.down.sql +++ b/db/migrations/postgres/000092_add_pin_namespace.down.sql @@ -1,3 +1,6 @@ BEGIN; +DROP INDEX pins_pin; +CREATE UNIQUE INDEX pins_pin ON pins(hash, batch_id, idx); + ALTER TABLE pins DROP COLUMN namespace; COMMIT; diff --git a/db/migrations/postgres/000092_add_pin_namespace.up.sql b/db/migrations/postgres/000092_add_pin_namespace.up.sql index 36d55b6340..903d70daca 100644 --- a/db/migrations/postgres/000092_add_pin_namespace.up.sql +++ b/db/migrations/postgres/000092_add_pin_namespace.up.sql @@ -2,4 +2,7 @@ BEGIN; ALTER TABLE pins ADD COLUMN namespace VARCHAR(64); UPDATE pins SET namespace = 'ff_system'; ALTER TABLE pins ALTER COLUMN namespace SET NOT NULL; + +DROP INDEX pins_pin; +CREATE UNIQUE INDEX pins_pin ON pins(namespace, hash, batch_id, idx); COMMIT; diff --git a/db/migrations/sqlite/000092_add_pin_namespace.down.sql b/db/migrations/sqlite/000092_add_pin_namespace.down.sql index 3871b630f0..0f819fec77 100644 --- a/db/migrations/sqlite/000092_add_pin_namespace.down.sql +++ b/db/migrations/sqlite/000092_add_pin_namespace.down.sql @@ -1 +1,4 @@ +DROP INDEX pins_pin; +CREATE UNIQUE INDEX pins_pin ON pins(hash, batch_id, idx); + ALTER TABLE pins DROP COLUMN namespace; diff --git a/db/migrations/sqlite/000092_add_pin_namespace.up.sql b/db/migrations/sqlite/000092_add_pin_namespace.up.sql index df14865e9f..0c4e0b75c6 100644 --- a/db/migrations/sqlite/000092_add_pin_namespace.up.sql +++ b/db/migrations/sqlite/000092_add_pin_namespace.up.sql @@ -1,2 +1,5 @@ ALTER TABLE pins ADD COLUMN namespace VARCHAR(64); UPDATE pins SET namespace = "ff_system"; + +DROP INDEX pins_pin; +CREATE UNIQUE INDEX pins_pin ON pins(namespace, hash, batch_id, idx); diff --git a/internal/events/aggregator.go b/internal/events/aggregator.go index ac1d7c1697..99c374bb42 100644 --- a/internal/events/aggregator.go +++ b/internal/events/aggregator.go @@ -45,6 +45,7 @@ const ( type aggregator struct { ctx context.Context + namespace string database database.Plugin messaging privatemessaging.Manager definitions definitions.DefinitionHandler @@ -64,10 +65,11 @@ type batchCacheEntry struct { manifest *core.BatchManifest } -func newAggregator(ctx context.Context, di database.Plugin, bi blockchain.Plugin, pm privatemessaging.Manager, sh definitions.DefinitionHandler, im identity.Manager, dm data.Manager, en *eventNotifier, mm metrics.Manager) *aggregator { +func newAggregator(ctx context.Context, ns string, di database.Plugin, bi blockchain.Plugin, pm privatemessaging.Manager, sh definitions.DefinitionHandler, im identity.Manager, dm data.Manager, en *eventNotifier, mm metrics.Manager) *aggregator { batchSize := config.GetInt(coreconfig.EventAggregatorBatchSize) ag := &aggregator{ ctx: log.WithLogField(ctx, "role", "aggregator"), + namespace: ns, database: di, messaging: pm, definitions: sh, @@ -94,7 +96,7 @@ func newAggregator(ctx context.Context, di database.Plugin, bi blockchain.Plugin Factor: config.GetFloat64(coreconfig.EventAggregatorRetryFactor), }, firstEvent: &firstEvent, - namespace: "pins", // not a real namespace (used only for logging) + namespace: ns, offsetType: core.OffsetTypeAggregator, offsetName: aggregatorOffsetName, newEventsHandler: ag.processPinsEventsHandler, @@ -160,6 +162,7 @@ func (ag *aggregator) rewindOffchainBatches() (bool, int64) { _ = ag.retry.Do(ag.ctx, "check for off-chain batch deliveries", func(attempt int) (retry bool, err error) { pfb := database.PinQueryFactory.NewFilter(ag.ctx) pinFilter := pfb.And( + pfb.Eq("namespace", ag.namespace), pfb.In("batch", batchIDs), pfb.Eq("dispatched", false), ).Sort("sequence").Limit(1) // only need the one oldest sequence @@ -223,7 +226,8 @@ func (ag *aggregator) processPinsEventsHandler(items []core.LocallySequenced) (r func (ag *aggregator) getPins(ctx context.Context, filter database.Filter, offset int64) ([]core.LocallySequenced, error) { log.L(ctx).Tracef("Reading page of pins > %d (first pin would be %d)", offset, offset+1) - pins, _, err := ag.database.GetPins(ctx, filter) + fb := database.PinQueryFactory.NewFilter(ctx) + pins, _, err := ag.database.GetPins(ctx, fb.And(filter, fb.Eq("namespace", ag.namespace))) ls := make([]core.LocallySequenced, len(pins)) for i, p := range pins { ls[i] = p diff --git a/internal/events/aggregator_batch_state.go b/internal/events/aggregator_batch_state.go index ffad4d46ce..2613cbd787 100644 --- a/internal/events/aggregator_batch_state.go +++ b/internal/events/aggregator_batch_state.go @@ -33,6 +33,7 @@ import ( func newBatchState(ag *aggregator) *batchState { return &batchState{ + namespace: ag.namespace, database: ag.database, messaging: ag.messaging, data: ag.data, @@ -95,6 +96,7 @@ type dispatchedMessage struct { // Runs in a database operation group/tranaction, which will be the same as phase (1) if there // are no pre-finalize handlers registered. type batchState struct { + namespace string database database.Plugin messaging privatemessaging.Manager data data.Manager @@ -171,6 +173,7 @@ func (bs *batchState) CheckUnmaskedContextReady(ctx context.Context, contextUnma // We need to check there's no earlier sequences with the same unmasked context fb := database.PinQueryFactory.NewFilterLimit(ctx, 1) // only need the first one filter := fb.And( + fb.Eq("namespace", bs.namespace), fb.Eq("hash", contextUnmasked), fb.Eq("dispatched", false), fb.Lt("sequence", firstMsgPinSequence), @@ -445,6 +448,7 @@ func (bs *batchState) attemptContextInit(ctx context.Context, msg *core.Message, // Check none of the other zerohashes exist before us in the stream fb := database.PinQueryFactory.NewFilter(ctx) filter := fb.And( + fb.Eq("namespace", bs.namespace), fb.In("hash", zeroHashes), fb.Eq("dispatched", false), fb.Lt("sequence", pinnedSequence), diff --git a/internal/events/aggregator_test.go b/internal/events/aggregator_test.go index e52cb2ab9f..9e4e6007b4 100644 --- a/internal/events/aggregator_test.go +++ b/internal/events/aggregator_test.go @@ -59,7 +59,7 @@ func newTestAggregatorCommon(metrics bool) (*aggregator, func()) { mmi.On("IsMetricsEnabled").Return(metrics) mbi.On("VerifierType").Return(core.VerifierTypeEthAddress) ctx, cancel := context.WithCancel(context.Background()) - ag := newAggregator(ctx, mdi, mbi, mpm, msh, mim, mdm, newEventNotifier(ctx, "ut"), mmi) + ag := newAggregator(ctx, "ns1", mdi, mbi, mpm, msh, mim, mdm, newEventNotifier(ctx, "ut"), mmi) return ag, func() { cancel() ag.batchCache.Stop() diff --git a/internal/events/event_manager.go b/internal/events/event_manager.go index 293d32c553..33c76440ed 100644 --- a/internal/events/event_manager.go +++ b/internal/events/event_manager.go @@ -140,7 +140,7 @@ func NewEventManager(ctx context.Context, ns string, ni sysmessaging.LocalNodeIn defaultTransport: config.GetString(coreconfig.EventTransportsDefault), newEventNotifier: newEventNotifier, newPinNotifier: newPinNotifier, - aggregator: newAggregator(ctx, di, bi, pm, dh, im, dm, newPinNotifier, mm), + aggregator: newAggregator(ctx, ns, di, bi, pm, dh, im, dm, newPinNotifier, mm), metrics: mm, chainListenerCache: ccache.New(ccache.Configure().MaxSize(config.GetByteSize(coreconfig.EventListenerTopicCacheSize))), chainListenerCacheTTL: config.GetDuration(coreconfig.EventListenerTopicCacheTTL),