Skip to content

Commit

Permalink
Always include namespace when querying pins
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Richardson <andrew.richardson@kaleido.io>
  • Loading branch information
awrichar committed Jun 13, 2022
1 parent f684b6a commit f8ef566
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 5 deletions.
3 changes: 3 additions & 0 deletions db/migrations/postgres/000092_add_pin_namespace.down.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
BEGIN;
DROP INDEX pins_pin;
CREATE UNIQUE INDEX pins_pin ON pins(hash, batch_id, idx);

ALTER TABLE pins DROP COLUMN namespace;
COMMIT;
3 changes: 3 additions & 0 deletions db/migrations/postgres/000092_add_pin_namespace.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ BEGIN;
ALTER TABLE pins ADD COLUMN namespace VARCHAR(64);
UPDATE pins SET namespace = 'ff_system';
ALTER TABLE pins ALTER COLUMN namespace SET NOT NULL;

DROP INDEX pins_pin;
CREATE UNIQUE INDEX pins_pin ON pins(namespace, hash, batch_id, idx);
COMMIT;
3 changes: 3 additions & 0 deletions db/migrations/sqlite/000092_add_pin_namespace.down.sql
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
DROP INDEX pins_pin;
CREATE UNIQUE INDEX pins_pin ON pins(hash, batch_id, idx);

ALTER TABLE pins DROP COLUMN namespace;
3 changes: 3 additions & 0 deletions db/migrations/sqlite/000092_add_pin_namespace.up.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
ALTER TABLE pins ADD COLUMN namespace VARCHAR(64);
UPDATE pins SET namespace = "ff_system";

DROP INDEX pins_pin;
CREATE UNIQUE INDEX pins_pin ON pins(namespace, hash, batch_id, idx);
10 changes: 7 additions & 3 deletions internal/events/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (

type aggregator struct {
ctx context.Context
namespace string
database database.Plugin
messaging privatemessaging.Manager
definitions definitions.DefinitionHandler
Expand All @@ -64,10 +65,11 @@ type batchCacheEntry struct {
manifest *core.BatchManifest
}

func newAggregator(ctx context.Context, di database.Plugin, bi blockchain.Plugin, pm privatemessaging.Manager, sh definitions.DefinitionHandler, im identity.Manager, dm data.Manager, en *eventNotifier, mm metrics.Manager) *aggregator {
func newAggregator(ctx context.Context, ns string, di database.Plugin, bi blockchain.Plugin, pm privatemessaging.Manager, sh definitions.DefinitionHandler, im identity.Manager, dm data.Manager, en *eventNotifier, mm metrics.Manager) *aggregator {
batchSize := config.GetInt(coreconfig.EventAggregatorBatchSize)
ag := &aggregator{
ctx: log.WithLogField(ctx, "role", "aggregator"),
namespace: ns,
database: di,
messaging: pm,
definitions: sh,
Expand All @@ -94,7 +96,7 @@ func newAggregator(ctx context.Context, di database.Plugin, bi blockchain.Plugin
Factor: config.GetFloat64(coreconfig.EventAggregatorRetryFactor),
},
firstEvent: &firstEvent,
namespace: "pins", // not a real namespace (used only for logging)
namespace: ns,
offsetType: core.OffsetTypeAggregator,
offsetName: aggregatorOffsetName,
newEventsHandler: ag.processPinsEventsHandler,
Expand Down Expand Up @@ -160,6 +162,7 @@ func (ag *aggregator) rewindOffchainBatches() (bool, int64) {
_ = ag.retry.Do(ag.ctx, "check for off-chain batch deliveries", func(attempt int) (retry bool, err error) {
pfb := database.PinQueryFactory.NewFilter(ag.ctx)
pinFilter := pfb.And(
pfb.Eq("namespace", ag.namespace),
pfb.In("batch", batchIDs),
pfb.Eq("dispatched", false),
).Sort("sequence").Limit(1) // only need the one oldest sequence
Expand Down Expand Up @@ -223,7 +226,8 @@ func (ag *aggregator) processPinsEventsHandler(items []core.LocallySequenced) (r

func (ag *aggregator) getPins(ctx context.Context, filter database.Filter, offset int64) ([]core.LocallySequenced, error) {
log.L(ctx).Tracef("Reading page of pins > %d (first pin would be %d)", offset, offset+1)
pins, _, err := ag.database.GetPins(ctx, filter)
fb := database.PinQueryFactory.NewFilter(ctx)
pins, _, err := ag.database.GetPins(ctx, fb.And(filter, fb.Eq("namespace", ag.namespace)))
ls := make([]core.LocallySequenced, len(pins))
for i, p := range pins {
ls[i] = p
Expand Down
4 changes: 4 additions & 0 deletions internal/events/aggregator_batch_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

func newBatchState(ag *aggregator) *batchState {
return &batchState{
namespace: ag.namespace,
database: ag.database,
messaging: ag.messaging,
data: ag.data,
Expand Down Expand Up @@ -95,6 +96,7 @@ type dispatchedMessage struct {
// Runs in a database operation group/tranaction, which will be the same as phase (1) if there
// are no pre-finalize handlers registered.
type batchState struct {
namespace string
database database.Plugin
messaging privatemessaging.Manager
data data.Manager
Expand Down Expand Up @@ -171,6 +173,7 @@ func (bs *batchState) CheckUnmaskedContextReady(ctx context.Context, contextUnma
// We need to check there's no earlier sequences with the same unmasked context
fb := database.PinQueryFactory.NewFilterLimit(ctx, 1) // only need the first one
filter := fb.And(
fb.Eq("namespace", bs.namespace),
fb.Eq("hash", contextUnmasked),
fb.Eq("dispatched", false),
fb.Lt("sequence", firstMsgPinSequence),
Expand Down Expand Up @@ -445,6 +448,7 @@ func (bs *batchState) attemptContextInit(ctx context.Context, msg *core.Message,
// Check none of the other zerohashes exist before us in the stream
fb := database.PinQueryFactory.NewFilter(ctx)
filter := fb.And(
fb.Eq("namespace", bs.namespace),
fb.In("hash", zeroHashes),
fb.Eq("dispatched", false),
fb.Lt("sequence", pinnedSequence),
Expand Down
2 changes: 1 addition & 1 deletion internal/events/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func newTestAggregatorCommon(metrics bool) (*aggregator, func()) {
mmi.On("IsMetricsEnabled").Return(metrics)
mbi.On("VerifierType").Return(core.VerifierTypeEthAddress)
ctx, cancel := context.WithCancel(context.Background())
ag := newAggregator(ctx, mdi, mbi, mpm, msh, mim, mdm, newEventNotifier(ctx, "ut"), mmi)
ag := newAggregator(ctx, "ns1", mdi, mbi, mpm, msh, mim, mdm, newEventNotifier(ctx, "ut"), mmi)
return ag, func() {
cancel()
ag.batchCache.Stop()
Expand Down
2 changes: 1 addition & 1 deletion internal/events/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func NewEventManager(ctx context.Context, ns string, ni sysmessaging.LocalNodeIn
defaultTransport: config.GetString(coreconfig.EventTransportsDefault),
newEventNotifier: newEventNotifier,
newPinNotifier: newPinNotifier,
aggregator: newAggregator(ctx, di, bi, pm, dh, im, dm, newPinNotifier, mm),
aggregator: newAggregator(ctx, ns, di, bi, pm, dh, im, dm, newPinNotifier, mm),
metrics: mm,
chainListenerCache: ccache.New(ccache.Configure().MaxSize(config.GetByteSize(coreconfig.EventListenerTopicCacheSize))),
chainListenerCacheTTL: config.GetDuration(coreconfig.EventListenerTopicCacheTTL),
Expand Down
1 change: 1 addition & 0 deletions pkg/database/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,7 @@ var EventQueryFactory = &queryFields{

// PinQueryFactory filter fields for parked contexts
var PinQueryFactory = &queryFields{
"namespace": &StringField{},
"sequence": &Int64Field{},
"masked": &BoolField{},
"hash": &Bytes32Field{},
Expand Down

0 comments on commit f8ef566

Please sign in to comment.